This is an automated email from the ASF dual-hosted git repository. yuzhaojing pushed a commit to branch upstream/release-feature-rfc46 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit ec486855e5cd9ccc42c9863d5b8b15a612f0a51e Author: wulei <[email protected]> AuthorDate: Tue Jul 5 02:09:12 2022 +0800 [HUDI-3350][HUDI-3351] Support HoodieMerge API and Spark engine-specific HoodieRecord (#5627) Co-authored-by: wangzixuan.wzxuan <[email protected]> --- .../apache/hudi/config/HoodieCompactionConfig.java | 12 + .../org/apache/hudi/config/HoodieWriteConfig.java | 11 + .../java/org/apache/hudi/io/HoodieMergeHandle.java | 5 +- .../java/org/apache/hudi/io/HoodieWriteHandle.java | 4 + .../hudi/table/action/commit/BaseWriteHelper.java | 7 +- .../table/action/commit/HoodieWriteHelper.java | 11 +- .../hudi/testutils/HoodieWriteableTestTable.java | 13 +- .../org/apache/hudi/table/HoodieFlinkTable.java | 2 - .../hudi/table/action/commit/FlinkWriteHelper.java | 6 +- .../hudi/table/action/commit/JavaWriteHelper.java | 7 +- .../hudi/io/storage/row/HoodieRowCreateHandle.java | 1 - .../spark/sql/HoodieCatalystExpressionUtils.scala | 21 +- .../TestHoodieClientOnCopyOnWriteStorage.java | 7 +- .../io/storage/row/TestHoodieRowCreateHandle.java | 1 - .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 20 +- .../hudi/common/model/HoodieAvroIndexedRecord.java | 17 +- .../apache/hudi/common/model/HoodieAvroRecord.java | 49 +--- .../hudi/common/model/HoodieAvroRecordMerge.java | 57 +++++ .../org/apache/hudi/common/model/HoodieMerge.java | 38 +++ .../org/apache/hudi/common/model/HoodieRecord.java | 36 +-- .../hudi/common/table/HoodieTableConfig.java | 15 ++ .../hudi/common/table/HoodieTableMetaClient.java | 10 + .../hudi/common/table/TableSchemaResolver.java | 1 - .../table/log/AbstractHoodieLogRecordReader.java | 7 + .../table/log/HoodieMergedLogRecordScanner.java | 7 +- .../table/log/block/HoodieParquetDataBlock.java | 2 +- .../apache/hudi/common/util/HoodieRecordUtils.java | 68 +++++ .../apache/hudi/common/util/ReflectionUtils.java | 13 - .../apache/hudi/common/util/SpillableMapUtils.java | 4 +- .../hudi/metadata/HoodieBackedTableMetadata.java | 41 +-- .../hudi/common/util/HoodieRecordUtilsTest.java | 47 ++++ .../apache/hudi/configuration/FlinkOptions.java | 8 + .../org/apache/hudi/sink/StreamWriteFunction.java | 15 +- .../apache/hudi/streamer/FlinkStreamerConfig.java | 6 + .../org/apache/hudi/table/HoodieTableSource.java | 3 +- .../table/format/mor/MergeOnReadInputFormat.java | 16 +- .../table/format/mor/MergeOnReadTableState.java | 9 +- .../java/org/apache/hudi/util/StreamerUtil.java | 2 + .../apache/hudi/source/TestStreamReadOperator.java | 3 +- .../test/java/org/apache/hudi/utils/TestData.java | 4 +- .../BulkInsertDataInternalWriterHelper.java | 2 +- .../scala/org/apache/hudi/DataSourceOptions.scala | 6 + .../scala/org/apache/hudi/HoodieBaseRelation.scala | 10 +- .../org/apache/hudi/HoodieMergeOnReadRDD.scala | 21 +- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 2 + .../spark/sql/hudi/HoodieInternalRowUtils.scala | 282 +++++++++++++++++++++ .../apache/spark/sql/hudi/HoodieSparkRecord.java | 190 ++++++++++++++ .../spark/sql/hudi/HoodieSparkRecordMerge.java | 48 ++++ .../org/apache/spark/sql/hudi/SparkHelpers.scala | 2 +- .../apache/hudi/TestHoodieInternalRowUtils.scala | 114 +++++++++ .../hudi/TestStructTypeSchemaEvolutionUtils.scala | 222 ++++++++++++++++ .../hudi/utilities/deltastreamer/DeltaSync.java | 2 + .../deltastreamer/HoodieDeltaStreamer.java | 5 + 53 files changed, 1325 insertions(+), 187 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index d1d0e67261..0b7311b701 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.config.ConfigClassProperty; import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.model.HoodieAvroRecordMerge; import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy; @@ -113,6 +114,12 @@ public class HoodieCompactionConfig extends HoodieConfig { + "compaction during each compaction run. By default. Hudi picks the log file " + "with most accumulated unmerged data"); + public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty + .key("hoodie.compaction.merge.class") + .defaultValue(HoodieAvroRecordMerge.class.getName()) + .withDocumentation("Merge class provide stateless component interface for merging records, and support various HoodieRecord " + + "types, such as Spark records or Flink records."); + public static final ConfigProperty<String> COMPACTION_LAZY_BLOCK_READ_ENABLE = ConfigProperty .key("hoodie.compaction.lazy.block.read") .defaultValue("true") @@ -346,6 +353,11 @@ public class HoodieCompactionConfig extends HoodieConfig { return this; } + public Builder withMergeClass(String mergeClass) { + compactionConfig.setValue(MERGE_CLASS_NAME, mergeClass); + return this; + } + public Builder withTargetIOPerCompactionInMB(long targetIOPerCompactionInMB) { compactionConfig.setValue(TARGET_IO_PER_COMPACTION_IN_MB, String.valueOf(targetIOPerCompactionInMB)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 2cb9e8ce82..8d903cffac 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -33,6 +33,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FileSystemRetryConfig; +import org.apache.hudi.common.model.HoodieAvroRecordMerge; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; @@ -125,6 +126,12 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. " + "This will render any value set for PRECOMBINE_FIELD_OPT_VAL in-effective"); + public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty + .key("hoodie.datasource.write.merge.class") + .defaultValue(HoodieAvroRecordMerge.class.getName()) + .withDocumentation("Merge class provide stateless component interface for merging records, and support various HoodieRecord " + + "types, such as Spark records or Flink records."); + public static final ConfigProperty<String> KEYGENERATOR_CLASS_NAME = ConfigProperty .key("hoodie.datasource.write.keygenerator.class") .noDefaultValue() @@ -1321,6 +1328,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getString(HoodiePayloadConfig.PAYLOAD_CLASS_NAME); } + public String getMergeClass() { + return getString(HoodieCompactionConfig.MERGE_CLASS_NAME); + } + public int getTargetPartitionsPerDayBasedCompaction() { return getInt(HoodieCompactionConfig.TARGET_PARTITIONS_PER_DAYBASED_COMPACTION); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 3b6bda7877..fff4aa6d05 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -338,10 +338,7 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O> // writing the first record. So make a copy of the record to be merged HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key).newInstance(); try { - Option<HoodieRecord> combinedRecord = - hoodieRecord.combineAndGetUpdateValue(oldRecord, - schema, - props); + Option<HoodieRecord> combinedRecord = merge.combineAndGetUpdateValue(oldRecord, hoodieRecord, schema, props); if (combinedRecord.isPresent() && combinedRecord.get().shouldIgnore(schema, props)) { // If it is an IGNORE_RECORD, just copy the old record, and do not update the new record. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 09f8831f8b..9ee6e0884d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -27,7 +27,9 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieMerge; import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; @@ -59,6 +61,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends HoodieIOHandle<T, I, */ protected final Schema tableSchema; protected final Schema tableSchemaWithMetaFields; + protected final HoodieMerge merge; /** * The write schema. In most case the write schema is the same to the @@ -103,6 +106,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends HoodieIOHandle<T, I, this.taskContextSupplier = taskContextSupplier; this.writeToken = makeWriteToken(); schemaOnReadEnabled = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema()); + this.merge = HoodieRecordUtils.loadMerge(config.getMergeClass()); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java index d8682152fa..1efe3d9641 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java @@ -19,7 +19,9 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieMerge; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; @@ -81,9 +83,10 @@ public abstract class BaseWriteHelper<T, I, K, O, R> { */ public I deduplicateRecords( I records, HoodieTable<T, I, K, O> table, int parallelism) { - return deduplicateRecords(records, table.getIndex(), parallelism); + HoodieMerge merge = HoodieRecordUtils.loadMerge(table.getConfig().getMergeClass()); + return deduplicateRecords(records, table.getIndex(), parallelism, merge); } public abstract I deduplicateRecords( - I records, HoodieIndex<?, ?> index, int parallelism); + I records, HoodieIndex<?, ?> index, int parallelism, HoodieMerge merge); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java index e24cd71ab6..57bb511c63 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java @@ -23,13 +23,13 @@ import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieMerge; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> { - private HoodieWriteHelper() { } @@ -49,7 +49,7 @@ public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<Hoodi @Override public HoodieData<HoodieRecord<T>> deduplicateRecords( - HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) { + HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, HoodieMerge merge) { boolean isIndexingGlobal = index.isGlobal(); return records.mapToPair(record -> { HoodieKey hoodieKey = record.getKey(); @@ -58,10 +58,9 @@ public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<Hoodi return Pair.of(key, record); }).reduceByKey((rec1, rec2) -> { @SuppressWarnings("unchecked") - HoodieRecord reducedRec = rec2.preCombine(rec1); - HoodieKey reducedKey = rec1.getData().equals(reducedRec) ? rec1.getKey() : rec2.getKey(); - - return (HoodieRecord<T>) reducedRec.newInstance(reducedKey); + HoodieRecord<T> reducedRecord = merge.preCombine(rec1, rec2); + HoodieKey reducedKey = rec1.getData().equals(reducedRecord.getData()) ? rec1.getKey() : rec2.getKey(); + return reducedRecord.newInstance(reducedKey); }, parallelism).map(Pair::getRight); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java index d2ecd09a23..a952576026 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java @@ -19,6 +19,12 @@ package org.apache.hudi.testutils; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; @@ -44,13 +50,6 @@ import org.apache.hudi.io.storage.HoodieAvroParquetWriter; import org.apache.hudi.io.storage.HoodieOrcConfig; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.metadata.HoodieTableMetadataWriter; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.orc.CompressionKind; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index 1ae12780f0..e3e25fa9a9 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -39,8 +39,6 @@ import org.apache.avro.specific.SpecificRecordBase; import java.util.List; -import static org.apache.hudi.common.data.HoodieList.getList; - public abstract class HoodieFlinkTable<T> extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> implements ExplicitWriteHandleTable<T> { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java index 20b8b9fcf3..ce6107714d 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieMerge; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.index.HoodieIndex; @@ -86,13 +87,14 @@ public class FlinkWriteHelper<T, R> extends BaseWriteHelper<T, List<HoodieRecord @Override public List<HoodieRecord<T>> deduplicateRecords( - List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) { + List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, HoodieMerge merge) { // If index used is global, then records are expected to differ in their partitionPath Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream() .collect(Collectors.groupingBy(record -> record.getKey().getRecordKey())); return keyedRecords.values().stream().map(x -> x.stream().reduce((rec1, rec2) -> { - @SuppressWarnings("unchecked") final HoodieRecord reducedRec = rec2.preCombine(rec1); + @SuppressWarnings("unchecked") + final HoodieRecord reducedRec = merge.preCombine(rec1, rec2); // we cannot allow the user to change the key or partitionPath, since that will affect // everything // so pick it from one of the records. diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java index f002c208d1..39c60447c5 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieMerge; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; @@ -53,7 +54,7 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T, List<HoodieRecord<T @Override public List<HoodieRecord<T>> deduplicateRecords( - List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) { + List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, HoodieMerge merge) { boolean isIndexingGlobal = index.isGlobal(); Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords = records.stream().map(record -> { HoodieKey hoodieKey = record.getKey(); @@ -64,11 +65,11 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T, List<HoodieRecord<T return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> { @SuppressWarnings("unchecked") - HoodieRecord reducedRec = rec2.preCombine(rec1); + HoodieRecord<T> reducedRecord = merge.preCombine(rec1,rec2); // we cannot allow the user to change the key or partitionPath, since that will affect // everything // so pick it from one of the records. - return (HoodieRecord<T>) reducedRec.newInstance(rec1.getKey()); + return reducedRecord.newInstance(rec1.getKey()); }).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList()); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java index 2b581004dc..e7c6ccd6fa 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java @@ -23,7 +23,6 @@ import org.apache.hudi.client.model.HoodieInternalRow; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.util.HoodieTimer; diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala index a3b9c210b9..636dd299fe 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction} -import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, SubqueryExpression, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateMutableProjection, GenerateUnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, MutableProjection, SubqueryExpression, UnsafeProjection} import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan} import org.apache.spark.sql.types.StructType @@ -42,6 +42,23 @@ trait HoodieCatalystExpressionUtils { GenerateUnsafeProjection.generate(targetExprs, attrs) } + /** + * Generates instance of [[MutableProjection]] projecting row of one [[StructType]] into another [[StructType]] + * + * NOTE: No safety checks are executed to validate that this projection is actually feasible, + * it's up to the caller to make sure that such projection is possible. + * + * NOTE: Projection of the row from [[StructType]] A to [[StructType]] B is only possible, if + * B is a subset of A + */ + def generateMutableProjection(from: StructType, to: StructType): MutableProjection = { + val attrs = from.toAttributes + val attrsMap = attrs.map(attr => (attr.name, attr)).toMap + val targetExprs = to.fields.map(f => attrsMap(f.name)) + + GenerateMutableProjection.generate(targetExprs, attrs) + } + /** * Parses and resolves expression against the attributes of the given table schema. * diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 8ba459b772..05588a80ba 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -38,12 +38,14 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieAvroRecordMerge; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieMerge; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.IOType; @@ -462,7 +464,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { // Global dedup should be done based on recordKey only HoodieIndex index = mock(HoodieIndex.class); when(index.isGlobal()).thenReturn(true); - List<HoodieRecord<RawTripTestPayload>> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList(); + HoodieMerge merge = new HoodieAvroRecordMerge(); + List<HoodieRecord<RawTripTestPayload>> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, merge).collectAsList(); assertEquals(1, dedupedRecs.size()); assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath()); assertNodupesWithinPartition(dedupedRecs); @@ -470,7 +473,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { // non-Global dedup should be done based on both recordKey and partitionPath index = mock(HoodieIndex.class); when(index.isGlobal()).thenReturn(false); - dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList(); + dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1, merge).collectAsList(); assertEquals(2, dedupedRecs.size()); assertNodupesWithinPartition(dedupedRecs); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java index b4e75a02b2..ad73a256a6 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java @@ -21,7 +21,6 @@ package org.apache.hudi.io.storage.row; import org.apache.hudi.client.HoodieInternalWriteStatus; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.StringUtils; diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 463ea82f96..81e0032157 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -22,9 +22,9 @@ import org.apache.hudi.common.config.SerializableSchema; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -768,9 +768,7 @@ public class HoodieAvroUtils { Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord); switch (newSchema.getType()) { case RECORD: - if (!(oldRecord instanceof IndexedRecord)) { - throw new IllegalArgumentException("cannot rewrite record with different type"); - } + ValidationUtils.checkArgument(oldRecord instanceof IndexedRecord, "cannot rewrite record with different type"); IndexedRecord indexedRecord = (IndexedRecord) oldRecord; List<Schema.Field> fields = newSchema.getFields(); GenericData.Record newRecord = new GenericData.Record(newSchema); @@ -802,9 +800,7 @@ public class HoodieAvroUtils { } return newRecord; case ARRAY: - if (!(oldRecord instanceof Collection)) { - throw new IllegalArgumentException("cannot rewrite record with different type"); - } + ValidationUtils.checkArgument(oldRecord instanceof Collection, "cannot rewrite record with different type"); Collection array = (Collection)oldRecord; List<Object> newArray = new ArrayList(); fieldNames.push("element"); @@ -814,9 +810,7 @@ public class HoodieAvroUtils { fieldNames.pop(); return newArray; case MAP: - if (!(oldRecord instanceof Map)) { - throw new IllegalArgumentException("cannot rewrite record with different type"); - } + ValidationUtils.checkArgument(oldRecord instanceof Map, "cannot rewrite record with different type"); Map<Object, Object> map = (Map<Object, Object>) oldRecord; Map<Object, Object> newMap = new HashMap<>(); fieldNames.push("value"); @@ -832,7 +826,7 @@ public class HoodieAvroUtils { } } - private static String createFullName(Deque<String> fieldNames) { + public static String createFullName(Deque<String> fieldNames) { String result = ""; if (!fieldNames.isEmpty()) { List<String> parentNames = new ArrayList<>(); @@ -967,7 +961,7 @@ public class HoodieAvroUtils { } // convert days to Date - private static java.sql.Date toJavaDate(int days) { + public static java.sql.Date toJavaDate(int days) { long localMillis = Math.multiplyExact(days, MILLIS_PER_DAY); int timeZoneOffset; TimeZone defaultTimeZone = TimeZone.getDefault(); @@ -980,7 +974,7 @@ public class HoodieAvroUtils { } // convert Date to days - private static int fromJavaDate(Date date) { + public static int fromJavaDate(Date date) { long millisUtc = date.getTime(); long millisLocal = millisUtc + TimeZone.getDefault().getOffset(millisUtc); int julianDays = Math.toIntExact(Math.floorDiv(millisLocal, MILLIS_PER_DAY)); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java index ac2df00151..daec2fee03 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java @@ -48,7 +48,7 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> { } public HoodieAvroIndexedRecord(HoodieKey key, IndexedRecord data, HoodieOperation operation) { - super(key, data, operation); + super(key, data, operation, null); } public HoodieAvroIndexedRecord(HoodieRecord<IndexedRecord> record) { @@ -67,11 +67,6 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> { return Option.of(data); } - @Override - public Comparable<?> getOrderingValue() { - throw new UnsupportedOperationException(); - } - @Override public HoodieRecord newInstance() { throw new UnsupportedOperationException(); @@ -99,16 +94,6 @@ public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> { .map(Object::toString).orElse(null); } - @Override - public HoodieRecord preCombine(HoodieRecord<IndexedRecord> previousRecord) { - throw new UnsupportedOperationException(); - } - - @Override - public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord previousRecord, Schema schema, Properties props) throws IOException { - return Option.empty(); - } - @Override public HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException { ValidationUtils.checkState(other instanceof HoodieAvroIndexedRecord); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java index 9a9011da37..7bc6af89f8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java @@ -21,11 +21,10 @@ package org.apache.hudi.common.model; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.keygen.BaseKeyGenerator; -import org.apache.hudi.metadata.HoodieMetadataPayload; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -39,7 +38,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; -import static org.apache.hudi.TypeUtils.unsafeCast; +import static org.apache.hudi.common.util.TypeUtils.unsafeCast; public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecord<T> { public HoodieAvroRecord(HoodieKey key, T data) { @@ -47,7 +46,7 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor } public HoodieAvroRecord(HoodieKey key, T data, HoodieOperation operation) { - super(key, data, operation); + super(key, data, operation, null); } public HoodieAvroRecord(HoodieRecord<T> record) { @@ -106,34 +105,6 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor // NOTE: This method duplicates those ones of the HoodieRecordPayload and are placed here // for the duration of RFC-46 implementation, until migration off `HoodieRecordPayload` // is complete - // - // TODO cleanup - - // NOTE: This method is assuming semantic that `preCombine` operation is bound to pick one or the other - // object, and may not create a new one - @Override - public HoodieRecord<T> preCombine(HoodieRecord<T> previousRecord) { - T picked = unsafeCast(getData().preCombine(previousRecord.getData())); - if (picked instanceof HoodieMetadataPayload) { - // NOTE: HoodieMetadataPayload return a new payload - return new HoodieAvroRecord<>(getKey(), picked, getOperation()); - } - return picked.equals(getData()) ? this : previousRecord; - } - - // NOTE: This method is assuming semantic that only records bearing the same (partition, key) could - // be combined - @Override - public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord previousRecord, Schema schema, Properties props) throws IOException { - Option<IndexedRecord> previousRecordAvroPayload = previousRecord.toIndexedRecord(schema, props); - if (!previousRecordAvroPayload.isPresent()) { - return Option.empty(); - } - - return getData().combineAndGetUpdateValue(previousRecordAvroPayload.get(), schema, props) - .map(combinedAvroPayload -> new HoodieAvroIndexedRecord((IndexedRecord) combinedAvroPayload)); - } - @Override public HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException { ValidationUtils.checkState(other instanceof HoodieAvroRecord); @@ -141,7 +112,7 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor (GenericRecord) toIndexedRecord(readerSchema, new Properties()).get(), (GenericRecord) other.toIndexedRecord(readerSchema, new Properties()).get(), writerSchema); - return new HoodieAvroRecord(getKey(), instantiateRecordPayloadWrapper(mergedPayload, getPrecombineValue(getData())), getOperation()); + return new HoodieAvroRecord(getKey(), instantiateRecordPayloadWrapper(mergedPayload, getOrderingValue()), getOperation()); } @Override @@ -234,20 +205,10 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> extends HoodieRecor @Nonnull private T instantiateRecordPayloadWrapper(Object combinedAvroPayload, Comparable newPreCombineVal) { return unsafeCast( - ReflectionUtils.loadPayload( + HoodieRecordUtils.loadPayload( getData().getClass().getCanonicalName(), new Object[]{combinedAvroPayload, newPreCombineVal}, GenericRecord.class, Comparable.class)); } - - private static <T extends HoodieRecordPayload> Comparable getPrecombineValue(T data) { - if (data instanceof BaseAvroPayload) { - return ((BaseAvroPayload) data).orderingVal; - } - - return -1; - } - - ////////////////////////////////////////////////////////////////////////////// } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerge.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerge.java new file mode 100644 index 0000000000..bea89efdc1 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerge.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.metadata.HoodieMetadataPayload; + +import java.io.IOException; +import java.util.Properties; + +import static org.apache.hudi.common.util.TypeUtils.unsafeCast; + +public class HoodieAvroRecordMerge implements HoodieMerge { + @Override + public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) { + HoodieRecordPayload picked = unsafeCast(((HoodieAvroRecord) newer).getData().preCombine(((HoodieAvroRecord) older).getData())); + if (picked instanceof HoodieMetadataPayload) { + // NOTE: HoodieMetadataPayload return a new payload + return new HoodieAvroRecord(newer.getKey(), picked, newer.getOperation()); + } + return picked.equals(((HoodieAvroRecord) newer).getData()) ? newer : older; + } + + @Override + public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException { + Option<IndexedRecord> previousRecordAvroPayload; + if (older instanceof HoodieAvroIndexedRecord) { + previousRecordAvroPayload = Option.ofNullable(((HoodieAvroIndexedRecord) older).getData()); + } else { + previousRecordAvroPayload = ((HoodieRecordPayload)older.getData()).getInsertValue(schema, props); + } + if (!previousRecordAvroPayload.isPresent()) { + return Option.empty(); + } + + return ((HoodieAvroRecord) newer).getData().combineAndGetUpdateValue(previousRecordAvroPayload.get(), schema, props) + .map(combinedAvroPayload -> new HoodieAvroIndexedRecord((IndexedRecord) combinedAvroPayload)); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMerge.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMerge.java new file mode 100644 index 0000000000..6becf35591 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieMerge.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.avro.Schema; +import org.apache.hudi.common.util.Option; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Properties; + +/** + * HoodieMerge defines how to merge two records. It is a stateless component. + * It can implement the merging logic of HoodieRecord of different engines + * and avoid the performance consumption caused by the serialization/deserialization of Avro payload. + */ +public interface HoodieMerge extends Serializable { + + HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer); + + Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException; +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java index 6b54933d6a..b26f1d296e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java @@ -135,25 +135,35 @@ public abstract class HoodieRecord<T> implements Serializable { */ private HoodieOperation operation; + /** + * For purposes of preCombining. + */ + private Comparable<?> orderingVal; + public HoodieRecord(HoodieKey key, T data) { - this(key, data, null); + this(key, data, null, null); } - public HoodieRecord(HoodieKey key, T data, HoodieOperation operation) { + public HoodieRecord(HoodieKey key, T data, Comparable<?> orderingVal) { + this(key, data, null, orderingVal); + } + + public HoodieRecord(HoodieKey key, T data, HoodieOperation operation, Comparable<?> orderingVal) { this.key = key; this.data = data; this.currentLocation = null; this.newLocation = null; this.sealed = false; this.operation = operation; + // default natural order is 0 + this.orderingVal = orderingVal == null ? 0 : orderingVal; } public HoodieRecord(HoodieRecord<T> record) { - this(record.key, record.data); + this(record.key, record.data, record.operation, record.orderingVal); this.currentLocation = record.currentLocation; this.newLocation = record.newLocation; this.sealed = record.sealed; - this.operation = record.operation; } public HoodieRecord() { @@ -173,15 +183,17 @@ public abstract class HoodieRecord<T> implements Serializable { return operation; } + public Comparable<?> getOrderingValue() { + return orderingVal; + } + public T getData() { if (data == null) { - throw new IllegalStateException("Payload already deflated for record."); + throw new IllegalStateException("HoodieRecord already deflated for record."); } return data; } - public abstract Comparable<?> getOrderingValue(); - /** * Release the actual payload, to ease memory pressure. To be called after the record has been written to storage. * Once deflated, cannot be inflated. @@ -285,16 +297,6 @@ public abstract class HoodieRecord<T> implements Serializable { // for the duration of RFC-46 implementation, until migration off `HoodieRecordPayload` // is complete // - // TODO cleanup - - // NOTE: This method is assuming semantic that `preCombine` operation is bound to pick one or the other - // object, and may not create a new one - public abstract HoodieRecord<T> preCombine(HoodieRecord<T> previousRecord); - - // NOTE: This method is assuming semantic that only records bearing the same (partition, key) could - // be combined - public abstract Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord previousRecord, Schema schema, Properties props) throws IOException; - public abstract HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException; public abstract HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema, TypedProperties props) throws IOException; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 6b64ec4897..aae29a21bb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.OrderedProperties; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieAvroRecordMerge; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; @@ -155,6 +156,12 @@ public class HoodieTableConfig extends HoodieConfig { .withDocumentation("Payload class to use for performing compactions, i.e merge delta logs with current base file and then " + " produce a new base file."); + public static final ConfigProperty<String> MERGE_CLASS_NAME = ConfigProperty + .key("hoodie.compaction.merge.class") + .defaultValue(HoodieAvroRecordMerge.class.getName()) + .withDocumentation("Merge class provide stateless component interface for merging records, and support various HoodieRecord " + + "types, such as Spark records or Flink records."); + public static final ConfigProperty<String> ARCHIVELOG_FOLDER = ConfigProperty .key("hoodie.archivelog.folder") .defaultValue("archived") @@ -481,6 +488,14 @@ public class HoodieTableConfig extends HoodieConfig { "org.apache.hudi"); } + /** + * Read the hoodie merge class for HoodieRecords from the table properties. + */ + public String getMergeClass() { + return getStringOrDefault(MERGE_CLASS_NAME).replace("com.uber.hoodie", + "org.apache.hudi"); + } + public String getPreCombineField() { return getString(PRECOMBINE_FIELD); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 529b0e8c99..b0da7a9203 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -725,6 +725,7 @@ public class HoodieTableMetaClient implements Serializable { private String recordKeyFields; private String archiveLogFolder; private String payloadClassName; + private String mergeClassName; private Integer timelineLayoutVersion; private String baseFileFormat; private String preCombineField; @@ -791,6 +792,11 @@ public class HoodieTableMetaClient implements Serializable { return this; } + public PropertyBuilder setMergeClassName(String mergeClassName) { + this.mergeClassName = mergeClassName; + return this; + } + public PropertyBuilder setPayloadClass(Class<? extends HoodieRecordPayload> payloadClass) { return setPayloadClassName(payloadClass.getName()); } @@ -1004,6 +1010,10 @@ public class HoodieTableMetaClient implements Serializable { tableConfig.setValue(HoodieTableConfig.PAYLOAD_CLASS_NAME, payloadClassName); } + if (mergeClassName != null) { + tableConfig.setValue(HoodieTableConfig.MERGE_CLASS_NAME, mergeClassName); + } + if (null != tableCreateSchema) { tableConfig.setValue(HoodieTableConfig.CREATE_SCHEMA, tableCreateSchema); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index f7373fde3b..1c31e0120c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -22,7 +22,6 @@ import org.apache.avro.JsonProperties; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.SchemaCompatibility; -import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 1f27028e26..afad95c4aa 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -96,6 +96,8 @@ public abstract class AbstractHoodieLogRecordReader { private final String payloadClassFQN; // preCombine field private final String preCombineField; + // Stateless component for merging records + private final String mergeClassFQN; // simple key gen fields private Option<Pair<String, String>> simpleKeyGenFields = Option.empty(); // Log File Paths @@ -160,6 +162,7 @@ public abstract class AbstractHoodieLogRecordReader { HoodieTableConfig tableConfig = this.hoodieTableMetaClient.getTableConfig(); this.payloadClassFQN = tableConfig.getPayloadClass(); this.preCombineField = tableConfig.getPreCombineField(); + this.mergeClassFQN = tableConfig.getMergeClass(); this.totalLogFiles.addAndGet(logFilePaths.size()); this.logFilePaths = logFilePaths; this.reverseReader = reverseReader; @@ -525,6 +528,10 @@ public abstract class AbstractHoodieLogRecordReader { return payloadClassFQN; } + protected String getMergeClassFQN() { + return mergeClassFQN; + } + public Option<String> getPartitionName() { return partitionName; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index 6d313b64f9..dfc3c14b5b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -24,10 +24,12 @@ import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieMerge; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.HoodieRecordSizeEstimator; +import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; @@ -78,6 +80,8 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader // Stores the total time taken to perform reading and merging of log blocks private long totalTimeTakenToReadAndMergeBlocks; + private final HoodieMerge merge; + @SuppressWarnings("unchecked") protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily, @@ -95,6 +99,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(readerSchema), diskMapType, isBitCaskDiskMapCompressionEnabled); this.maxMemorySizeInBytes = maxMemorySizeInBytes; + this.merge = HoodieRecordUtils.loadMerge(getMergeClassFQN()); } catch (IOException e) { throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e); } @@ -150,7 +155,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key); HoodieRecordPayload oldValue = oldRecord.getData(); - HoodieRecordPayload combinedValue = (HoodieRecordPayload)hoodieRecord.preCombine(oldRecord).getData(); + HoodieRecordPayload combinedValue = (HoodieRecordPayload) merge.preCombine(oldRecord, hoodieRecord).getData(); // If combinedValue is oldValue, no need rePut oldRecord if (combinedValue != oldValue) { HoodieOperation operation = hoodieRecord.getOperation(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java index ebe53fe471..98f5dcf3a0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java @@ -31,8 +31,8 @@ import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetReaderIterator; -import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.io.storage.HoodieAvroFileReader.HoodieRecordTransformIterator; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.io.storage.HoodieParquetStreamWriter; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.avro.AvroReadSupport; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java new file mode 100644 index 0000000000..075d117fe2 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import org.apache.hudi.common.model.HoodieMerge; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.exception.HoodieException; + +import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; +import java.util.Map; + +/** + * A utility class for HoodieRecord. + */ +public class HoodieRecordUtils { + + private static final Map<String, Object> INSTANCE_CACHE = new HashMap<>(); + + /** + * Instantiate a given class with a record merge. + */ + public static HoodieMerge loadMerge(String mergeClass) { + try { + HoodieMerge merge = (HoodieMerge) INSTANCE_CACHE.get(mergeClass); + if (null == merge) { + synchronized (HoodieMerge.class) { + merge = (HoodieMerge) INSTANCE_CACHE.get(mergeClass); + if (null == merge) { + merge = (HoodieMerge)ReflectionUtils.loadClass(mergeClass, new Object[]{}); + INSTANCE_CACHE.put(mergeClass, merge); + } + } + } + return merge; + } catch (HoodieException e) { + throw new HoodieException("Unable to instantiate hoodie merge class ", e); + } + } + + /** + * Instantiate a given class with an avro record payload. + */ + public static <T extends HoodieRecordPayload> T loadPayload(String recordPayloadClass, Object[] payloadArgs, + Class<?>... constructorArgTypes) { + try { + return (T) ReflectionUtils.getClass(recordPayloadClass).getConstructor(constructorArgTypes).newInstance(payloadArgs); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { + throw new HoodieException("Unable to instantiate payload class ", e); + } + } +} \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java index 6ee7928c75..b3e178320b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java @@ -18,7 +18,6 @@ package org.apache.hudi.common.util; -import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.exception.HoodieException; import org.apache.log4j.LogManager; @@ -69,18 +68,6 @@ public class ReflectionUtils { } } - /** - * Instantiate a given class with a generic record payload. - */ - public static <T extends HoodieRecordPayload> T loadPayload(String recordPayloadClass, Object[] payloadArgs, - Class<?>... constructorArgTypes) { - try { - return (T) getClass(recordPayloadClass).getConstructor(constructorArgTypes).newInstance(payloadArgs); - } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { - throw new HoodieException("Unable to instantiate payload class ", e); - } - } - /** * Creates an instance of the given class. Use this version when dealing with interface types as constructor args. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java index d4bafd9c9f..d2d91bbfb6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java @@ -137,7 +137,7 @@ public class SpillableMapUtils { HoodieOperation operation = withOperationField ? HoodieOperation.fromName(getNullableValAsString(record, HoodieRecord.OPERATION_METADATA_FIELD)) : null; HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = new HoodieAvroRecord<>(new HoodieKey(recKey, partitionPath), - ReflectionUtils.loadPayload(payloadClazz, new Object[]{record, preCombineVal}, GenericRecord.class, + HoodieRecordUtils.loadPayload(payloadClazz, new Object[]{record, preCombineVal}, GenericRecord.class, Comparable.class), operation); return (R) hoodieRecord; @@ -163,7 +163,7 @@ public class SpillableMapUtils { */ public static <R> R generateEmptyPayload(String recKey, String partitionPath, Comparable orderingVal, String payloadClazz) { HoodieRecord<? extends HoodieRecordPayload> hoodieRecord = new HoodieAvroRecord<>(new HoodieKey(recKey, partitionPath), - ReflectionUtils.loadPayload(payloadClazz, new Object[] {null, orderingVal}, GenericRecord.class, Comparable.class)); + HoodieRecordUtils.loadPayload(payloadClazz, new Object[] {null, orderingVal}, GenericRecord.class, Comparable.class)); return (R) hoodieRecord; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index c67f0e1b98..360af469c2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -157,9 +157,10 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { List<FileSlice> partitionFileSlices = HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, partitionName); - return engineContext.parallelize(partitionFileSlices) + return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) : + engineContext.parallelize(partitionFileSlices)) .flatMap( - (SerializableFunction<FileSlice, Iterator<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>>) fileSlice -> { + (SerializableFunction<FileSlice, Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> { // NOTE: Since this will be executed by executors, we can't access previously cached // readers, and therefore have to always open new ones Pair<HoodieAvroFileReader, HoodieMetadataMergedLogRecordReader> readers = @@ -170,31 +171,31 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { HoodieAvroFileReader baseFileReader = readers.getKey(); HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight(); - if (baseFileReader == null && logRecordScanner == null) { - // TODO: what do we do if both does not exist? should we throw an exception and let caller do the fallback ? - return Collections.emptyIterator(); - } + if (baseFileReader == null && logRecordScanner == null) { + // TODO: what do we do if both does not exist? should we throw an exception and let caller do the fallback ? + return Collections.emptyIterator(); + } - boolean fullKeys = false; + boolean fullKeys = false; - Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = - readLogRecords(logRecordScanner, sortedKeyPrefixes, fullKeys, timings); + Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = + readLogRecords(logRecordScanner, sortedKeyPrefixes, fullKeys, timings); - List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> mergedRecords = - readFromBaseAndMergeWithLogRecords(baseFileReader, sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName); + List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> mergedRecords = + readFromBaseAndMergeWithLogRecords(baseFileReader, sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName); - LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms", - sortedKeyPrefixes.size(), timings)); + LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms", + sortedKeyPrefixes.size(), timings)); - return mergedRecords.stream() + return mergedRecords.stream() .map(keyRecordPair -> keyRecordPair.getValue().orElse(null)) .iterator(); - } catch (IOException ioe) { - throw new HoodieIOException("Error merging records from metadata table for " + sortedKeyPrefixes.size() + " key : ", ioe); - } finally { - closeReader(readers); - } - }) + } catch (IOException ioe) { + throw new HoodieIOException("Error merging records from metadata table for " + sortedKeyPrefixes.size() + " key : ", ioe); + } finally { + closeReader(readers); + } + }) .filter(Objects::nonNull); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/HoodieRecordUtilsTest.java b/hudi-common/src/test/java/org/apache/hudi/common/util/HoodieRecordUtilsTest.java new file mode 100644 index 0000000000..0c51571c9e --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/HoodieRecordUtilsTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.model.DefaultHoodieRecordPayload; +import org.apache.hudi.common.model.HoodieAvroRecordMerge; +import org.apache.hudi.common.model.HoodieMerge; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class HoodieRecordUtilsTest { + + @Test + void loadHoodieMerge() { + String mergeClassName = HoodieAvroRecordMerge.class.getName(); + HoodieMerge merge1 = HoodieRecordUtils.loadMerge(mergeClassName); + HoodieMerge merge2 = HoodieRecordUtils.loadMerge(mergeClassName); + assertEquals(merge1.getClass().getName(), mergeClassName); + assertEquals(merge1, merge2); + } + + @Test + void loadPayload() { + String payloadClassName = DefaultHoodieRecordPayload.class.getName(); + HoodieRecordPayload payload = HoodieRecordUtils.loadPayload(payloadClassName, new Object[]{null, 0}, GenericRecord.class, Comparable.class); + assertEquals(payload.getClass().getName(), payloadClassName); + } +} \ No newline at end of file diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 933c112312..118ff5ccc7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringP import org.apache.hudi.common.config.ConfigClassProperty; import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.model.HoodieAvroRecordMerge; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; @@ -291,6 +292,13 @@ public class FlinkOptions extends HoodieConfig { .withDescription("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.\n" + "This will render any value set for the option in-effective"); + public static final ConfigOption<String> MERGE_CLASS_NAME = ConfigOptions + .key("write.merge.class") + .stringType() + .defaultValue(HoodieAvroRecordMerge.class.getName()) + .withDescription("Merge class provide stateless component interface for merging records, and support various HoodieRecord " + + "types, such as Spark records or Flink records."); + /** * Flag to indicate whether to drop duplicates before insert/upsert. * By default false to gain extra performance. diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 2748af5290..b6b65bb8f1 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -23,9 +23,11 @@ import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieMerge; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.ObjectSizeCalculator; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.configuration.FlinkOptions; @@ -102,6 +104,8 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> { private transient BiFunction<List<HoodieRecord>, String, List<WriteStatus>> writeFunction; + private transient HoodieMerge merge; + /** * Total size tracer. */ @@ -121,6 +125,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> { this.tracer = new TotalSizeTracer(this.config); initBuffer(); initWriteFunction(); + initMergeClass(); } @Override @@ -195,6 +200,12 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> { } } + private void initMergeClass() { + String mergeClassName = metaClient.getTableConfig().getMergeClass(); + LOG.info("init hoodie merge with class [{}]", mergeClassName); + merge = HoodieRecordUtils.loadMerge(mergeClassName); + } + /** * Represents a data item in the buffer, this is needed to reduce the * memory footprint. @@ -421,7 +432,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> { List<HoodieRecord> records = bucket.writeBuffer(); ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records"); if (config.getBoolean(FlinkOptions.PRE_COMBINE)) { - records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); + records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, merge); } bucket.preWrite(records); final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, instant)); @@ -456,7 +467,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> { List<HoodieRecord> records = bucket.writeBuffer(); if (records.size() > 0) { if (config.getBoolean(FlinkOptions.PRE_COMBINE)) { - records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); + records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1, merge); } bucket.preWrite(records); writeStatus.addAll(writeFunction.apply(records, currentInstant)); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 3a69e5d080..712bd9b782 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -19,6 +19,7 @@ package org.apache.hudi.streamer; import org.apache.hudi.client.utils.OperationConverter; +import org.apache.hudi.common.model.HoodieAvroRecordMerge; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteOperationType; @@ -118,6 +119,10 @@ public class FlinkStreamerConfig extends Configuration { + "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value.") public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName(); + @Parameter(names = {"--merge-class"}, description = "Implements of HoodieMerge, that defines how to merge two records." + + "Implement your own, if you want to implement specific record merge logic.") + public String mergeClassName = HoodieAvroRecordMerge.class.getName(); + @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input " + "is purely new data/inserts to gain speed).", converter = OperationConverter.class) public WriteOperationType operation = WriteOperationType.UPSERT; @@ -357,6 +362,7 @@ public class FlinkStreamerConfig extends Configuration { conf.setString(FlinkOptions.OPERATION, config.operation.value()); conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField); conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, config.payloadClassName); + conf.setString(FlinkOptions.MERGE_CLASS_NAME, config.mergeClassName); conf.setBoolean(FlinkOptions.PRE_COMBINE, config.preCombine); conf.setInteger(FlinkOptions.RETRY_TIMES, Integer.parseInt(config.instantRetryTimes)); conf.setLong(FlinkOptions.RETRY_INTERVAL_MS, Long.parseLong(config.instantRetryInterval)); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 2034cb322e..c63b98b553 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -421,7 +421,8 @@ public class HoodieTableSource implements tableAvroSchema.toString(), AvroSchemaConverter.convertToSchema(requiredRowType).toString(), inputSplits, - conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",")); + conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","), + conf.getString(FlinkOptions.MERGE_CLASS_NAME)); return MergeOnReadInputFormat.builder() .config(this.conf) .tableState(hoodieTableState) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 76e9e60ee0..6dc6c76082 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -18,12 +18,15 @@ package org.apache.hudi.table.format.mor; +import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieOperation; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieMerge; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.log.InstantRange; import org.apache.hudi.common.util.ClosableIterator; +import org.apache.hudi.common.util.HoodieRecordUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; @@ -63,6 +66,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; +import java.util.Properties; import java.util.Set; import java.util.stream.IntStream; @@ -200,7 +204,8 @@ public class MergeOnReadInputFormat this.requiredPos, this.emitDelete, this.tableState.getOperationPos(), - getFullSchemaReader(split.getBasePath().get())); + getFullSchemaReader(split.getBasePath().get()), + tableState.getMergeClass()); } else { throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for " + "file path: " + split.getBasePath() @@ -627,6 +632,8 @@ public class MergeOnReadInputFormat private final InstantRange instantRange; + private final HoodieMerge merge; + // add the flag because the flink ParquetColumnarRowSplitReader is buggy: // method #reachedEnd() returns false after it returns true. // refactor it out once FLINK-22370 is resolved. @@ -647,7 +654,8 @@ public class MergeOnReadInputFormat int[] requiredPos, boolean emitDelete, int operationPos, - ParquetColumnarRowSplitReader reader) { // the reader should be with full schema + ParquetColumnarRowSplitReader reader, // the reader should be with full schema + String mergeClass) { this.tableSchema = tableSchema; this.reader = reader; this.scanner = FormatUtils.logScanner(split, tableSchema, finkConf, hadoopConf); @@ -661,6 +669,7 @@ public class MergeOnReadInputFormat this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType); this.projection = RowDataProjection.instance(requiredRowType, requiredPos); this.instantRange = split.getInstantRange().orElse(null); + this.merge = HoodieRecordUtils.loadMerge(mergeClass); } @Override @@ -751,7 +760,8 @@ public class MergeOnReadInputFormat String curKey) throws IOException { final HoodieAvroRecord<?> record = (HoodieAvroRecord) scanner.getRecords().get(curKey); GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow); - return record.getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema); + Option<HoodieRecord> resultRecord = merge.combineAndGetUpdateValue(new HoodieAvroIndexedRecord(historyAvroRecord), record, tableSchema, new Properties()); + return ((HoodieAvroIndexedRecord) resultRecord.get()).toIndexedRecord(); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java index 36dfecbb79..bbb21db7f8 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java @@ -41,6 +41,7 @@ public class MergeOnReadTableState implements Serializable { private final List<MergeOnReadInputSplit> inputSplits; private final String[] pkFields; private final int operationPos; + private final String mergeClass; public MergeOnReadTableState( RowType rowType, @@ -48,7 +49,8 @@ public class MergeOnReadTableState implements Serializable { String avroSchema, String requiredAvroSchema, List<MergeOnReadInputSplit> inputSplits, - String[] pkFields) { + String[] pkFields, + String mergeClass) { this.rowType = rowType; this.requiredRowType = requiredRowType; this.avroSchema = avroSchema; @@ -56,6 +58,7 @@ public class MergeOnReadTableState implements Serializable { this.inputSplits = inputSplits; this.pkFields = pkFields; this.operationPos = rowType.getFieldIndex(HoodieRecord.OPERATION_METADATA_FIELD); + this.mergeClass = mergeClass; } public RowType getRowType() { @@ -82,6 +85,10 @@ public class MergeOnReadTableState implements Serializable { return operationPos; } + public String getMergeClass() { + return mergeClass; + } + public int[] getRequiredPositions() { final List<String> fieldNames = rowType.getFieldNames(); return requiredRowType.getFieldNames().stream() diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 4e819ecd7b..4e481ab602 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -197,6 +197,7 @@ public class StreamerUtil { .archiveCommitsWith(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS)) .build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withMergeClass(conf.getString(FlinkOptions.MERGE_CLASS_NAME)) .withTargetIOPerCompactionInMB(conf.getLong(FlinkOptions.COMPACTION_TARGET_IO)) .withInlineCompactionTriggerStrategy( CompactionTriggerStrategy.valueOf(conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toUpperCase(Locale.ROOT))) @@ -304,6 +305,7 @@ public class StreamerUtil { .setTableName(conf.getString(FlinkOptions.TABLE_NAME)) .setRecordKeyFields(conf.getString(FlinkOptions.RECORD_KEY_FIELD, null)) .setPayloadClassName(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME)) + .setMergeClassName(conf.getString(FlinkOptions.MERGE_CLASS_NAME)) .setPreCombineField(OptionsResolver.getPreCombineField(conf)) .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue()) .setPartitionFields(conf.getString(FlinkOptions.PARTITION_PATH_FIELD, null)) diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java index 63d5c1f6bd..f2095c1844 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java @@ -263,7 +263,8 @@ public class TestStreamReadOperator { tableAvroSchema.toString(), AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(), Collections.emptyList(), - new String[0]); + new String[0], + metaClient.getTableConfig().getMergeClass()); MergeOnReadInputFormat inputFormat = MergeOnReadInputFormat.builder() .config(conf) .tableState(hoodieTableState) diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index d0cf143318..1f51113e13 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -733,8 +733,8 @@ public class TestData { if (scanner != null) { for (String curKey : scanner.getRecords().keySet()) { if (!keyToSkip.contains(curKey)) { - Option<GenericRecord> record = (Option<GenericRecord>) scanner.getRecords() - .get(curKey).getData() + Option<GenericRecord> record = (Option<GenericRecord>) ((HoodieAvroRecord) scanner.getRecords() + .get(curKey)).getData() .getInsertValue(schema, config.getProps()); if (record.isPresent()) { readBuffer.add(filterOutVariables(record.get())); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java index 8ad1c3412e..f7918cf3fd 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java @@ -21,7 +21,7 @@ package org.apache.hudi.internal; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.client.HoodieInternalWriteStatus; import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 16f52f33b1..0e95ce8973 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -301,6 +301,12 @@ object DataSourceWriteOptions { */ val PAYLOAD_CLASS_NAME = HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME + /** + * HoodieMerge will replace the payload to process the merge of data + * and provide the same capabilities as the payload + */ + val MERGE_CLASS_NAME = HoodieWriteConfig.MERGE_CLASS_NAME + /** * Record key field. Value to be used as the `recordKey` component of `HoodieKey`. Actual value * will be obtained by invoking .toString() on the field value. Nested fields can be specified using diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index e28d36d4c4..3c3e01ce78 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -40,9 +40,7 @@ import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema} import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} -import org.apache.hudi.io.storage.HoodieHFileReader import org.apache.hudi.io.storage.HoodieAvroHFileReader -import org.apache.spark.SerializableWritable import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD @@ -74,7 +72,8 @@ case class HoodieTableState(tablePath: String, preCombineFieldOpt: Option[String], usesVirtualKeys: Boolean, recordPayloadClassName: String, - metadataConfig: HoodieMetadataConfig) + metadataConfig: HoodieMetadataConfig, + mergeClass: String) /** * Hoodie BaseRelation which extends [[PrunedFilteredScan]]. @@ -458,7 +457,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, preCombineFieldOpt = preCombineFieldOpt, usesVirtualKeys = !tableConfig.populateMetaFields(), recordPayloadClassName = tableConfig.getPayloadClass, - metadataConfig = fileIndex.metadataConfig + metadataConfig = fileIndex.metadataConfig, + mergeClass = tableConfig.getMergeClass ) } @@ -737,7 +737,7 @@ object HoodieBaseRelation extends SparkAdapterSupport { reader.getRecordIterator(requiredAvroSchema).asScala .map(record => { - avroToRowConverter.apply(record).get + avroToRowConverter.apply(record.asInstanceOf[GenericRecord]).get }) } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index 927dbcfba7..d0032088a6 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -19,7 +19,7 @@ package org.apache.hudi import org.apache.avro.Schema -import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.JobConf @@ -31,8 +31,9 @@ import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig} import org.apache.hudi.common.engine.HoodieLocalEngineContext import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath -import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecord, HoodieRecordPayload, OverwriteWithLatestAvroPayload} +import org.apache.hudi.common.model.{HoodieAvroIndexedRecord, HoodieLogFile, HoodieRecord, HoodieRecordPayload, OverwriteWithLatestAvroPayload} import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner +import org.apache.hudi.common.util.HoodieRecordUtils import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.config.HoodiePayloadConfig import org.apache.hudi.exception.HoodieException @@ -203,10 +204,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] = logRecords.iterator.map { case (_, record) => - val avroRecordOpt = toScalaOption(record.toIndexedRecord(logFileReaderAvroSchema, payloadProps)) - avroRecordOpt.map { - avroRecord => projectAvroUnsafe(avroRecord, requiredAvroSchema, requiredSchemaFieldOrdinals, recordBuilder) - } + toScalaOption(record.getData.asInstanceOf[HoodieRecordPayload[_]].getInsertValue(logFileReaderAvroSchema, payloadProps)) + .map(_.asInstanceOf[GenericRecord]) } protected def removeLogRecord(key: String): Option[HoodieRecord[_]] = @@ -295,6 +294,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, private val baseFileIterator = baseFileReader(split.dataFile.get) + private val merger = HoodieRecordUtils.loadMerge(tableState.mergeClass) + override def hasNext: Boolean = hasNextInternal // NOTE: It's crucial for this method to be annotated w/ [[@tailrec]] to make sure @@ -310,12 +311,12 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, recordToLoad = requiredSchemaUnsafeProjection(curRow) true } else { - val mergedAvroRecordOpt = merge(serialize(curRowRecord), updatedRecordOpt.get) + val mergedAvroRecordOpt = merge(serialize(curRow), updatedRecordOpt.get) if (mergedAvroRecordOpt.isEmpty) { // Record has been deleted, skipping this.hasNextInternal } else { - val projectedAvroRecord = projectAvroUnsafe(mergedAvroRecordOpt.get.asInstanceOf[GenericRecord], + val projectedAvroRecord = projectAvroUnsafe(mergedAvroRecordOpt.get.getData.asInstanceOf[GenericRecord], requiredAvroSchema, reusableRecordBuilder) recordToLoad = deserialize(projectedAvroRecord) true @@ -329,10 +330,10 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, private def serialize(curRowRecord: InternalRow): GenericRecord = serializer.serialize(curRowRecord).asInstanceOf[GenericRecord] - private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ <: HoodieRecordPayload[_]]): Option[IndexedRecord] = { + private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_]): Option[HoodieRecord[_]] = { // NOTE: We have to pass in Avro Schema used to read from Delta Log file since we invoke combining API // on the record from the Delta Log - toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord, logFileReaderAvroSchema, payloadProps)) + toScalaOption(merger.combineAndGetUpdateValue(new HoodieAvroIndexedRecord(curAvroRecord), newRecord, logFileReaderAvroSchema, payloadProps)) } } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 3fa18b122a..b935ce1b13 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -150,6 +150,7 @@ object HoodieSparkSqlWriter { .setBaseFileFormat(baseFileFormat) .setArchiveLogFolder(archiveLogFolder) .setPayloadClassName(hoodieConfig.getString(PAYLOAD_CLASS_NAME)) + .setMergeClassName(hoodieConfig.getString(MERGE_CLASS_NAME)) // we can't fetch preCombine field from hoodieConfig object, since it falls back to "ts" as default value, // but we are interested in what user has set, hence fetching from optParams. .setPreCombineField(optParams.getOrElse(PRECOMBINE_FIELD.key(), null)) @@ -484,6 +485,7 @@ object HoodieSparkSqlWriter { .setRecordKeyFields(recordKeyFields) .setArchiveLogFolder(archiveLogFolder) .setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_NAME)) + .setMergeClassName(hoodieConfig.getStringOrDefault(MERGE_CLASS_NAME)) .setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD, null)) .setBootstrapIndexClass(bootstrapIndexClass) .setBaseFileFormat(baseFileFormat) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieInternalRowUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieInternalRowUtils.scala new file mode 100644 index 0000000000..4ff5cceef7 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieInternalRowUtils.scala @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import java.nio.charset.StandardCharsets +import java.util +import java.util.concurrent.ConcurrentHashMap +import org.apache.avro.Schema +import org.apache.hudi.AvroConversionUtils +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, fromJavaDate, toJavaDate} +import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField +import org.apache.hudi.common.util.ValidationUtils +import org.apache.hudi.exception.HoodieException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow, MutableProjection, Projection} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData} +import org.apache.spark.sql.hudi.ColumnStatsExpressionUtils.AllowedTransformationExpression.exprUtils.generateMutableProjection +import org.apache.spark.sql.types._ + +import scala.collection.mutable + +/** + * Helper class to do common stuff across Spark InternalRow. + * Provides common methods similar to {@link HoodieAvroUtils}. + */ +object HoodieInternalRowUtils { + + val projectionMap = new ConcurrentHashMap[(StructType, StructType), MutableProjection] + val schemaMap = new ConcurrentHashMap[Schema, StructType] + val SchemaPosMap = new ConcurrentHashMap[StructType, Map[String, (StructField, Int)]] + + def stitchRecords(left: InternalRow, leftSchema: StructType, right: InternalRow, rightSchema: StructType, stitchedSchema: StructType): InternalRow = { + val mergeSchema = StructType(leftSchema.fields ++ rightSchema.fields) + val row = new JoinedRow(left, right) + val projection = getCachedProjection(mergeSchema, stitchedSchema) + projection(row) + } + + def rewriteRecord(oldRecord: InternalRow, oldSchema: StructType, newSchema: StructType): InternalRow = { + val newRow = new GenericInternalRow(Array.fill(newSchema.fields.length)(null).asInstanceOf[Array[Any]]) + + val oldFieldMap = getCachedSchemaPosMap(oldSchema) + for ((field, pos) <- newSchema.fields.zipWithIndex) { + var oldValue: AnyRef = null + if (oldFieldMap.contains(field.name)) { + val (oldField, oldPos) = oldFieldMap(field.name) + oldValue = oldRecord.get(oldPos, oldField.dataType) + } + if (oldValue != null) { + field.dataType match { + case structType: StructType => + val oldField = oldFieldMap(field.name)._1.asInstanceOf[StructType] + rewriteRecord(oldValue.asInstanceOf[InternalRow], oldField, structType) + case decimalType: DecimalType => + val oldField = oldFieldMap(field.name)._1.asInstanceOf[DecimalType] + if (decimalType.scale != oldField.scale || decimalType.precision != oldField.precision) { + newRow.update(pos, Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale)) + ) + } else { + newRow.update(pos, oldValue) + } + case _ => + newRow.update(pos, oldValue) + } + } else { + // TODO default value in newSchema + } + } + + newRow + } + + def rewriteRecordWithNewSchema(oldRecord: InternalRow, oldSchema: StructType, newSchema: StructType, renameCols: util.Map[String, String]): InternalRow = { + rewriteRecordWithNewSchema(oldRecord, oldSchema, newSchema, renameCols, new util.LinkedList[String]).asInstanceOf[InternalRow] + } + + private def rewriteRecordWithNewSchema(oldRecord: Any, oldSchema: DataType, newSchema: DataType, renameCols: util.Map[String, String], fieldNames: util.Deque[String]): Any = { + if (oldRecord == null) { + null + } else { + newSchema match { + case targetSchema: StructType => + ValidationUtils.checkArgument(oldRecord.isInstanceOf[InternalRow], "cannot rewrite record with different type") + val oldRow = oldRecord.asInstanceOf[InternalRow] + val helper = mutable.Map[Integer, Any]() + + val oldSchemaPos = getCachedSchemaPosMap(oldSchema.asInstanceOf[StructType]) + targetSchema.fields.zipWithIndex.foreach { case (field, i) => + fieldNames.push(field.name) + if (oldSchemaPos.contains(field.name)) { + val (oldField, oldPos) = oldSchemaPos(field.name) + helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames) + } else { + val fieldFullName = createFullName(fieldNames) + val colNamePartsFromOldSchema = renameCols.getOrDefault(fieldFullName, "").split("\\.") + val lastColNameFromOldSchema = colNamePartsFromOldSchema(colNamePartsFromOldSchema.length - 1) + // deal with rename + if (!oldSchemaPos.contains(field.name) && oldSchemaPos.contains(lastColNameFromOldSchema)) { + // find rename + val (oldField, oldPos) = oldSchemaPos(lastColNameFromOldSchema) + helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames) + } + } + fieldNames.pop() + } + val newRow = new GenericInternalRow(Array.fill(targetSchema.length)(null).asInstanceOf[Array[Any]]) + targetSchema.fields.zipWithIndex.foreach { case (_, i) => + if (helper.contains(i)) { + newRow.update(i, helper(i)) + } else { + // TODO add default val + newRow.update(i, null) + } + } + + newRow + case targetSchema: ArrayType => + ValidationUtils.checkArgument(oldRecord.isInstanceOf[ArrayData], "cannot rewrite record with different type") + val oldElementType = oldSchema.asInstanceOf[ArrayType].elementType + val oldArray = oldRecord.asInstanceOf[ArrayData] + val newElementType = targetSchema.elementType + val newArray = new GenericArrayData(Array.fill(oldArray.numElements())(null).asInstanceOf[Array[Any]]) + fieldNames.push("element") + oldArray.toSeq[Any](oldElementType).zipWithIndex.foreach { case (value, i) => newArray.update(i, rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldElementType, newElementType, renameCols, fieldNames)) } + fieldNames.pop() + + newArray + case targetSchema: MapType => + ValidationUtils.checkArgument(oldRecord.isInstanceOf[MapData], "cannot rewrite record with different type") + val oldValueType = oldSchema.asInstanceOf[MapType].valueType + val oldKeyType = oldSchema.asInstanceOf[MapType].keyType + val oldMap = oldRecord.asInstanceOf[MapData] + val newValueType = targetSchema.valueType + val newKeyArray = new GenericArrayData(Array.fill(oldMap.keyArray().numElements())(null).asInstanceOf[Array[Any]]) + val newValueArray = new GenericArrayData(Array.fill(oldMap.valueArray().numElements())(null).asInstanceOf[Array[Any]]) + val newMap = new ArrayBasedMapData(newKeyArray, newValueArray) + fieldNames.push("value") + oldMap.keyArray().toSeq[Any](oldKeyType).zipWithIndex.foreach { case (value, i) => newKeyArray.update(i, value) } + oldMap.valueArray().toSeq[Any](oldValueType).zipWithIndex.foreach { case (value, i) => newValueArray.update(i, rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldValueType, newValueType, renameCols, fieldNames)) } + fieldNames.pop() + + newMap + case _ => rewritePrimaryType(oldRecord, oldSchema, newSchema) + } + } + } + + def rewriteRecordWithMetadata(record: InternalRow, oldSchema: StructType, newSchema: StructType, fileName: String): InternalRow = { + val newRecord = rewriteRecord(record, oldSchema, newSchema) + newRecord.update(HoodieMetadataField.FILENAME_METADATA_FIELD.ordinal, fileName) + + newRecord + } + + def rewriteEvolutionRecordWithMetadata(record: InternalRow, oldSchema: StructType, newSchema: StructType, fileName: String): InternalRow = { + val newRecord = rewriteRecordWithNewSchema(record, oldSchema, newSchema, new util.HashMap[String, String]()) + newRecord.update(HoodieMetadataField.FILENAME_METADATA_FIELD.ordinal, fileName) + + newRecord + } + + def getCachedSchema(schema: Schema): StructType = { + if (!schemaMap.contains(schema)) { + schemaMap.synchronized { + if (!schemaMap.contains(schema)) { + val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) + schemaMap.put(schema, structType) + } + } + } + schemaMap.get(schema) + } + + private def getCachedProjection(from: StructType, to: StructType): Projection = { + val schemaPair = (from, to) + if (!projectionMap.contains(schemaPair)) { + projectionMap.synchronized { + if (!projectionMap.contains(schemaPair)) { + val projection = generateMutableProjection(from, to) + projectionMap.put(schemaPair, projection) + } + } + } + projectionMap.get(schemaPair) + } + + def getCachedSchemaPosMap(schema: StructType): Map[String, (StructField, Int)] = { + if (!SchemaPosMap.contains(schema)) { + SchemaPosMap.synchronized { + if (!SchemaPosMap.contains(schema)) { + val fieldMap = schema.fields.zipWithIndex.map { case (field, i) => (field.name, (field, i)) }.toMap + SchemaPosMap.put(schema, fieldMap) + } + } + } + SchemaPosMap.get(schema) + } + + private def rewritePrimaryType(oldValue: Any, oldSchema: DataType, newSchema: DataType): Any = { + if (oldSchema.equals(newSchema) || (oldSchema.isInstanceOf[DecimalType] && newSchema.isInstanceOf[DecimalType])) { + oldSchema match { + case NullType | BooleanType | IntegerType | LongType | FloatType | DoubleType | StringType | DateType | TimestampType | BinaryType => + oldValue + case DecimalType() => + Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale)) + case _ => + throw new HoodieException("Unknown schema type: " + newSchema) + } + } else { + rewritePrimaryTypeWithDiffSchemaType(oldValue, oldSchema, newSchema) + } + } + + private def rewritePrimaryTypeWithDiffSchemaType(oldValue: Any, oldSchema: DataType, newSchema: DataType): Any = { + val value = newSchema match { + case NullType | BooleanType => + case DateType if oldSchema.equals(StringType) => + fromJavaDate(java.sql.Date.valueOf(oldValue.toString)) + case LongType => + oldSchema match { + case IntegerType => oldValue.asInstanceOf[Int].longValue() + case _ => + } + case FloatType => + oldSchema match { + case IntegerType => oldValue.asInstanceOf[Int].floatValue() + case LongType => oldValue.asInstanceOf[Long].floatValue() + case _ => + } + case DoubleType => + oldSchema match { + case IntegerType => oldValue.asInstanceOf[Int].doubleValue() + case LongType => oldValue.asInstanceOf[Long].doubleValue() + case FloatType => java.lang.Double.valueOf(oldValue.asInstanceOf[Float] + "") + case _ => + } + case BinaryType => + oldSchema match { + case StringType => oldValue.asInstanceOf[String].getBytes(StandardCharsets.UTF_8) + case _ => + } + case StringType => + oldSchema match { + case BinaryType => new String(oldValue.asInstanceOf[Array[Byte]]) + case DateType => toJavaDate(oldValue.asInstanceOf[Integer]).toString + case IntegerType | LongType | FloatType | DoubleType | DecimalType() => oldValue.toString + case _ => + } + case DecimalType() => + oldSchema match { + case IntegerType | LongType | FloatType | DoubleType | StringType => + val scale = newSchema.asInstanceOf[DecimalType].scale + + Decimal.fromDecimal(BigDecimal(oldValue.toString).setScale(scale)) + case _ => + } + case _ => + } + if (value == None) { + throw new HoodieException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema)) + } else { + value + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecord.java b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecord.java new file mode 100644 index 0000000000..a22e78af21 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecord.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieOperation; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.keygen.BaseKeyGenerator; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.apache.spark.sql.types.DataTypes.BooleanType; + +/** + * Spark Engine-specific Implementations of `HoodieRecord`. + */ +public class HoodieSparkRecord extends HoodieRecord<InternalRow> { + + public HoodieSparkRecord(HoodieKey key, InternalRow data, Comparable orderingVal) { + super(key, data, orderingVal); + } + + public HoodieSparkRecord(HoodieKey key, InternalRow data, HoodieOperation operation, Comparable orderingVal) { + super(key, data, operation, orderingVal); + } + + public HoodieSparkRecord(HoodieRecord<InternalRow> record) { + super(record); + } + + public HoodieSparkRecord() { + } + + @Override + public HoodieRecord<InternalRow> newInstance() { + return new HoodieSparkRecord(this); + } + + @Override + public HoodieRecord<InternalRow> newInstance(HoodieKey key, HoodieOperation op) { + return new HoodieSparkRecord(key, data, op, getOrderingValue()); + } + + @Override + public HoodieRecord<InternalRow> newInstance(HoodieKey key) { + return new HoodieSparkRecord(key, data, getOrderingValue()); + } + + @Override + public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) { + return getRecordKey(); + } + + @Override + public String getRecordKey(String keyFieldName) { + return getRecordKey(); + } + + @Override + public HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException { + StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(readerSchema); + StructType writerStructType = HoodieInternalRowUtils.getCachedSchema(writerSchema); + InternalRow mergeRow = HoodieInternalRowUtils.stitchRecords(data, readerStructType, (InternalRow) other.getData(), readerStructType, writerStructType); + return new HoodieSparkRecord(getKey(), mergeRow, getOperation()); + } + + @Override + public HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema, TypedProperties props) throws IOException { + StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema); + StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema); + InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecord(data, readerStructType, targetStructType); + return new HoodieSparkRecord(getKey(), rewriteRow, getOperation()); + } + + @Override + public HoodieRecord rewriteRecord(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields) throws IOException { + StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema); + StructType writeSchemaWithMetaFieldsStructType = HoodieInternalRowUtils.getCachedSchema(writeSchemaWithMetaFields); + InternalRow rewriteRow = schemaOnReadEnabled ? HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, readerStructType, writeSchemaWithMetaFieldsStructType, new HashMap<>()) + : HoodieInternalRowUtils.rewriteRecord(data, readerStructType, writeSchemaWithMetaFieldsStructType); + return new HoodieSparkRecord(getKey(), rewriteRow, getOperation()); + } + + @Override + public HoodieRecord rewriteRecordWithMetadata(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields, String fileName) throws IOException { + StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema); + StructType writeSchemaWithMetaFieldsStructType = HoodieInternalRowUtils.getCachedSchema(writeSchemaWithMetaFields); + InternalRow rewriteRow = schemaOnReadEnabled ? HoodieInternalRowUtils.rewriteEvolutionRecordWithMetadata(data, readerStructType, writeSchemaWithMetaFieldsStructType, fileName) + : HoodieInternalRowUtils.rewriteRecordWithMetadata(data, readerStructType, writeSchemaWithMetaFieldsStructType, fileName); + return new HoodieSparkRecord(getKey(), rewriteRow, getOperation()); + } + + @Override + public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema, Map<String, String> renameCols) throws IOException { + StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema); + StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema); + InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, readerStructType, newStructType, renameCols); + return new HoodieSparkRecord(getKey(), rewriteRow, getOperation()); + } + + @Override + public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema, Map<String, String> renameCols, Mapper mapper) throws IOException { + StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema); + StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema); + InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, readerStructType, newStructType, renameCols); + // TODO change mapper type + return mapper.apply((IndexedRecord) rewriteRow); + } + + @Override + public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema) throws IOException { + StructType readerStructType = HoodieInternalRowUtils.getCachedSchema(recordSchema); + StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema); + InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecord(data, readerStructType, newStructType); + return new HoodieSparkRecord(getKey(), rewriteRow, getOperation()); + } + + @Override + public HoodieRecord overrideMetadataFieldValue(Schema recordSchema, Properties prop, int pos, String newValue) throws IOException { + data.update(pos, newValue); + return this; + } + + @Override + public HoodieRecord addMetadataValues(Schema recordSchema, Properties prop, Map<HoodieMetadataField, String> metadataValues) throws IOException { + Arrays.stream(HoodieMetadataField.values()).forEach(metadataField -> { + String value = metadataValues.get(metadataField); + if (value != null) { + data.update(recordSchema.getField(metadataField.getFieldName()).pos(), value); + } + }); + return this; + } + + @Override + public Option<Map<String, String>> getMetadata() { + return Option.empty(); + } + + @Override + public boolean isPresent(Schema schema, Properties prop) throws IOException { + if (null == data) { + return false; + } + Object deleteMarker = data.get(schema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD).pos(), BooleanType); + return !(deleteMarker instanceof Boolean && (boolean) deleteMarker); + } + + @Override + public boolean shouldIgnore(Schema schema, Properties prop) throws IOException { + // TODO SENTINEL should refactor without Avro(GenericRecord) + if (null != data && data.equals(SENTINEL)) { + return true; + } else { + return false; + } + } + + @Override + public Option<IndexedRecord> toIndexedRecord(Schema schema, Properties prop) throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecordMerge.java b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecordMerge.java new file mode 100644 index 0000000000..88ae7c13df --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkRecordMerge.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi; + +import org.apache.avro.Schema; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieMerge; +import org.apache.hudi.common.util.Option; + +import java.io.IOException; +import java.util.Properties; + +public class HoodieSparkRecordMerge implements HoodieMerge { + + @Override + public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) { + if (older.getData() == null) { + // use natural order for delete record + return older; + } + if (older.getOrderingValue().compareTo(newer.getOrderingValue()) > 0) { + return older; + } else { + return newer; + } + } + + @Override + public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException { + return Option.of(newer); + } +} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala index 1ed0e5e1a4..2421face72 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala @@ -48,7 +48,7 @@ object SparkHelpers { // Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'. parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader) - val writer = new HoodieAvroParquetWriter[IndexedRecord](destinationFile, parquetConfig, instantTime, new SparkTaskContextSupplier(), true) + val writer = new HoodieAvroParquetWriter(destinationFile, parquetConfig, instantTime, new SparkTaskContextSupplier(), true) for (rec <- sourceRecords) { val key: String = rec.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString if (!keysToSkip.contains(key)) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala new file mode 100644 index 0000000000..7a08ee64bf --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.testutils.HoodieClientTestUtils +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.hudi.HoodieInternalRowUtils +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{Row, SparkSession} +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} + +class TestHoodieInternalRowUtils extends FunSuite with Matchers with BeforeAndAfterAll { + + private var sparkSession: SparkSession = _ + + private val schema1 = StructType( + Array( + StructField("name", StringType), + StructField("age", IntegerType) + ) + ) + private val schema2 = StructType( + Array( + StructField("name1", StringType), + StructField("age1", IntegerType) + ) + ) + private val schemaMerge = StructType(schema1.fields ++ schema2.fields) + private val schema1WithMetaData = StructType(Array( + StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType), + StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType), + StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, StringType), + StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, StringType), + StructField(HoodieRecord.FILENAME_METADATA_FIELD, StringType), + StructField(HoodieRecord.OPERATION_METADATA_FIELD, StringType), + StructField(HoodieRecord.HOODIE_IS_DELETED_FIELD, BooleanType) + ) ++ schema1.fields) + + override protected def beforeAll(): Unit = { + // Initialize a local spark env + val jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(classOf[TestHoodieInternalRowUtils].getName)) + jsc.setLogLevel("ERROR") + sparkSession = SparkSession.builder.config(jsc.getConf).getOrCreate + } + + override protected def afterAll(): Unit = { + sparkSession.close() + } + + test("test merge") { + val data1 = sparkSession.sparkContext.parallelize(Seq(Row("like", 18))) + val data2 = sparkSession.sparkContext.parallelize(Seq(Row("like1", 181))) + val row1 = sparkSession.createDataFrame(data1, schema1).queryExecution.toRdd.first() + val row2 = sparkSession.createDataFrame(data2, schema2).queryExecution.toRdd.first() + val rowMerge = HoodieInternalRowUtils.stitchRecords(row1, schema1, row2, schema2, schemaMerge) + assert(rowMerge.get(0, StringType).toString.equals("like")) + assert(rowMerge.get(1, IntegerType) == 18) + assert(rowMerge.get(2, StringType).toString.equals("like1")) + assert(rowMerge.get(3, IntegerType) == 181) + } + + test("test rewrite") { + val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18, "like1", 181))) + val oldRow = sparkSession.createDataFrame(data, schemaMerge).queryExecution.toRdd.first() + val newRow1 = HoodieInternalRowUtils.rewriteRecord(oldRow, schemaMerge, schema1) + val newRow2 = HoodieInternalRowUtils.rewriteRecord(oldRow, schemaMerge, schema2) + assert(newRow1.get(0, StringType).toString.equals("like")) + assert(newRow1.get(1, IntegerType) == 18) + assert(newRow2.get(0, StringType).toString.equals("like1")) + assert(newRow2.get(1, IntegerType) == 181) + } + + test("test rewrite with nullable value") { + val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18))) + val oldRow = sparkSession.createDataFrame(data, schema1).queryExecution.toRdd.first() + val newRow = HoodieInternalRowUtils.rewriteRecord(oldRow, schema1, schemaMerge) + assert(newRow.get(0, StringType).toString.equals("like")) + assert(newRow.get(1, IntegerType) == 18) + assert(newRow.get(2, StringType) == null) + assert(newRow.get(3, IntegerType) == null) + } + + test("test rewrite with metaDataFiled value") { + val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18))) + val oldRow = sparkSession.createDataFrame(data, schema1).queryExecution.toRdd.first() + val newRow = HoodieInternalRowUtils.rewriteRecordWithMetadata(oldRow, schema1, schema1WithMetaData, "file1") + assert(newRow.get(0, StringType) == null) + assert(newRow.get(1, StringType) == null) + assert(newRow.get(2, StringType) == null) + assert(newRow.get(3, StringType) == null) + assert(newRow.get(4, StringType).toString.equals("file1")) + assert(newRow.get(5, StringType) == null) + assert(newRow.get(6, BooleanType) == null) + assert(newRow.get(7, StringType).toString.equals("like")) + assert(newRow.get(8, IntegerType) == 18) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestStructTypeSchemaEvolutionUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestStructTypeSchemaEvolutionUtils.scala new file mode 100644 index 0000000000..cb5529721c --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestStructTypeSchemaEvolutionUtils.scala @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import java.nio.ByteBuffer +import java.util.{ArrayList, HashMap, Objects} +import org.apache.avro.generic.GenericData +import org.apache.avro.{LogicalTypes, Schema} +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.internal.schema.Types +import org.apache.hudi.internal.schema.action.TableChanges +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter +import org.apache.hudi.internal.schema.utils.SchemaChangeUtils +import org.apache.hudi.testutils.HoodieClientTestUtils +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} +import org.apache.spark.sql.hudi.HoodieInternalRowUtils +import org.apache.spark.sql.types._ +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} + +class TestStructTypeSchemaEvolutionUtils extends FunSuite with Matchers with BeforeAndAfterAll { + private var sparkSession: SparkSession = _ + + override protected def beforeAll(): Unit = { + // Initialize a local spark env + val jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(classOf[TestHoodieInternalRowUtils].getName)) + jsc.setLogLevel("ERROR") + sparkSession = SparkSession.builder.config(jsc.getConf).getOrCreate + } + + override protected def afterAll(): Unit = { + sparkSession.close() + } + + /** + * test record data type changes. + * int => long/float/double/string + * long => float/double/string + * float => double/String + * double => String/Decimal + * Decimal => Decimal/String + * String => date/decimal + * date => String + */ + test("test rewrite record with type changed") { + val avroSchema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"h0_record\",\"namespace\":\"hoodie.h0\",\"fields\"" + + ":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"default\":null}," + + "{\"name\":\"comb\",\"type\":[\"null\",\"int\"],\"default\":null}," + + "{\"name\":\"com1\",\"type\":[\"null\",\"int\"],\"default\":null}," + + "{\"name\":\"col0\",\"type\":[\"null\",\"int\"],\"default\":null}," + + "{\"name\":\"col1\",\"type\":[\"null\",\"long\"],\"default\":null}," + + "{\"name\":\"col11\",\"type\":[\"null\",\"long\"],\"default\":null}," + + "{\"name\":\"col12\",\"type\":[\"null\",\"long\"],\"default\":null}," + + "{\"name\":\"col2\",\"type\":[\"null\",\"float\"],\"default\":null}," + + "{\"name\":\"col21\",\"type\":[\"null\",\"float\"],\"default\":null}," + + "{\"name\":\"col3\",\"type\":[\"null\",\"double\"],\"default\":null}," + + "{\"name\":\"col31\",\"type\":[\"null\",\"double\"],\"default\":null}," + + "{\"name\":\"col4\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"fixed\",\"namespace\":\"hoodie.h0.h0_record.col4\"," + + "\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":4}],\"default\":null}," + + "{\"name\":\"col41\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"fixed\",\"namespace\":\"hoodie.h0.h0_record.col41\"," + + "\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":4}],\"default\":null}," + + "{\"name\":\"col5\",\"type\":[\"null\",\"string\"],\"default\":null}," + + "{\"name\":\"col51\",\"type\":[\"null\",\"string\"],\"default\":null}," + + "{\"name\":\"col6\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null}," + + "{\"name\":\"col7\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}],\"default\":null}," + + "{\"name\":\"col8\",\"type\":[\"null\",\"boolean\"],\"default\":null}," + + "{\"name\":\"col9\",\"type\":[\"null\",\"bytes\"],\"default\":null},{\"name\":\"par\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null}]}") + // create a test record with avroSchema + val avroRecord = new GenericData.Record(avroSchema) + avroRecord.put("id", 1) + avroRecord.put("comb", 100) + avroRecord.put("com1", -100) + avroRecord.put("col0", 256) + avroRecord.put("col1", 1000L) + avroRecord.put("col11", -100L) + avroRecord.put("col12", 2000L) + avroRecord.put("col2", -5.001f) + avroRecord.put("col21", 5.001f) + avroRecord.put("col3", 12.999d) + avroRecord.put("col31", 9999.999d) + val currentDecimalType = avroSchema.getField("col4").schema.getTypes.get(1) + val bd = new java.math.BigDecimal("123.456").setScale(currentDecimalType.getLogicalType.asInstanceOf[LogicalTypes.Decimal].getScale) + avroRecord.put("col4", HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, currentDecimalType, currentDecimalType.getLogicalType)) + val currentDecimalType1 = avroSchema.getField("col41").schema.getTypes.get(1) + val bd1 = new java.math.BigDecimal("7890.456").setScale(currentDecimalType1.getLogicalType.asInstanceOf[LogicalTypes.Decimal].getScale) + avroRecord.put("col41", HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd1, currentDecimalType1, currentDecimalType1.getLogicalType)) + avroRecord.put("col5", "2011-01-01") + avroRecord.put("col51", "199.342") + avroRecord.put("col6", 18987) + avroRecord.put("col7", 1640491505000000L) + avroRecord.put("col8", false) + val bb = ByteBuffer.wrap(Array[Byte](97, 48, 53)) + avroRecord.put("col9", bb) + assert(GenericData.get.validate(avroSchema, avroRecord)) + val internalSchema = AvroInternalSchemaConverter.convert(avroSchema) + // do change type operation + val updateChange = TableChanges.ColumnUpdateChange.get(internalSchema) + updateChange.updateColumnType("id", Types.LongType.get).updateColumnType("comb", Types.FloatType.get).updateColumnType("com1", Types.DoubleType.get).updateColumnType("col0", Types.StringType.get).updateColumnType("col1", Types.FloatType.get).updateColumnType("col11", Types.DoubleType.get).updateColumnType("col12", Types.StringType.get).updateColumnType("col2", Types.DoubleType.get).updateColumnType("col21", Types.StringType.get).updateColumnType("col3", Types.StringType.get).updateCo [...] + val newSchema = SchemaChangeUtils.applyTableChanges2Schema(internalSchema, updateChange) + val newAvroSchema = AvroInternalSchemaConverter.convert(newSchema, avroSchema.getName) + val newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap[String, String]) + assert(GenericData.get.validate(newAvroSchema, newRecord)) + // Convert avro to internalRow + val structTypeSchema = HoodieInternalRowUtils.getCachedSchema(avroSchema) + val newStructTypeSchema = HoodieInternalRowUtils.getCachedSchema(newAvroSchema) + val row = AvroConversionUtils.createAvroToInternalRowConverter(avroSchema, structTypeSchema).apply(avroRecord).get + val newRowExpected = AvroConversionUtils.createAvroToInternalRowConverter(newAvroSchema, newStructTypeSchema) + .apply(newRecord).get + val newRowActual = HoodieInternalRowUtils.rewriteRecordWithNewSchema(row, structTypeSchema, newStructTypeSchema, new HashMap[String, String]) + internalRowCompare(newRowExpected, newRowActual, newStructTypeSchema) + } + + test("test rewrite nest record") { + val record = Types.RecordType.get(Types.Field.get(0, false, "id", Types.IntType.get()), + Types.Field.get(1, true, "data", Types.StringType.get()), + Types.Field.get(2, true, "preferences", + Types.RecordType.get(Types.Field.get(5, false, "feature1", + Types.BooleanType.get()), Types.Field.get(6, true, "feature2", Types.BooleanType.get()))), + Types.Field.get(3, false, "doubles", Types.ArrayType.get(7, false, Types.DoubleType.get())), + Types.Field.get(4, false, "locations", Types.MapType.get(8, 9, Types.StringType.get(), + Types.RecordType.get(Types.Field.get(10, false, "lat", Types.FloatType.get()), Types.Field.get(11, false, "long", Types.FloatType.get())), false)) + ) + val schema = AvroInternalSchemaConverter.convert(record, "test1") + val avroRecord = new GenericData.Record(schema) + GenericData.get.validate(schema, avroRecord) + avroRecord.put("id", 2) + avroRecord.put("data", "xs") + // fill record type + val preferencesRecord = new GenericData.Record(AvroInternalSchemaConverter.convert(record.fieldType("preferences"), "test1_preferences")) + preferencesRecord.put("feature1", false) + preferencesRecord.put("feature2", true) + assert(GenericData.get.validate(AvroInternalSchemaConverter.convert(record.fieldType("preferences"), "test1_preferences"), preferencesRecord)) + avroRecord.put("preferences", preferencesRecord) + // fill mapType + val locations = new HashMap[String, GenericData.Record] + val mapSchema = AvroInternalSchemaConverter.convert(record.field("locations").`type`.asInstanceOf[Types.MapType].valueType, "test1_locations") + val locationsValue: GenericData.Record = new GenericData.Record(mapSchema) + locationsValue.put("lat", 1.2f) + locationsValue.put("long", 1.4f) + val locationsValue1: GenericData.Record = new GenericData.Record(mapSchema) + locationsValue1.put("lat", 2.2f) + locationsValue1.put("long", 2.4f) + locations.put("key1", locationsValue) + locations.put("key2", locationsValue1) + avroRecord.put("locations", locations) + val doubles = new ArrayList[Double] + doubles.add(2.0d) + doubles.add(3.0d) + avroRecord.put("doubles", doubles) + // do check + assert(GenericData.get.validate(schema, avroRecord)) + // create newSchema + val newRecord = Types.RecordType.get(Types.Field.get(0, false, "id", Types.IntType.get), Types.Field.get(1, true, "data", Types.StringType.get), Types.Field.get(2, true, "preferences", Types.RecordType.get(Types.Field.get(5, false, "feature1", Types.BooleanType.get), Types.Field.get(5, true, "featurex", Types.BooleanType.get), Types.Field.get(6, true, "feature2", Types.BooleanType.get))), Types.Field.get(3, false, "doubles", Types.ArrayType.get(7, false, Types.DoubleType.get)), Types [...] + val newAvroSchema = AvroInternalSchemaConverter.convert(newRecord, schema.getName) + val newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap[String, String]) + // test the correctly of rewrite + assert(GenericData.get.validate(newAvroSchema, newAvroRecord)) + // Convert avro to internalRow + val structTypeSchema = HoodieInternalRowUtils.getCachedSchema(schema) + val newStructTypeSchema = HoodieInternalRowUtils.getCachedSchema(newAvroSchema) + val row = AvroConversionUtils.createAvroToInternalRowConverter(schema, structTypeSchema).apply(avroRecord).get + val newRowExpected = AvroConversionUtils.createAvroToInternalRowConverter(newAvroSchema, newStructTypeSchema).apply(newAvroRecord).get + val newRowActual = HoodieInternalRowUtils.rewriteRecordWithNewSchema(row, structTypeSchema, newStructTypeSchema, new HashMap[String, String]) + internalRowCompare(newRowExpected, newRowActual, newStructTypeSchema) + } + + private def internalRowCompare(expected: Any, actual: Any, schema: DataType): Unit = { + schema match { + case StructType(fields) => + val expectedRow = expected.asInstanceOf[InternalRow] + val actualRow = actual.asInstanceOf[InternalRow] + fields.zipWithIndex.foreach { case (field, i) => internalRowCompare(expectedRow.get(i, field.dataType), actualRow.get(i, field.dataType), field.dataType) } + case ArrayType(elementType, _) => + val expectedArray = expected.asInstanceOf[ArrayData].toSeq[Any](elementType) + val actualArray = actual.asInstanceOf[ArrayData].toSeq[Any](elementType) + if (expectedArray.size != actualArray.size) { + throw new AssertionError() + } else { + expectedArray.zip(actualArray).foreach { case (e1, e2) => internalRowCompare(e1, e2, elementType) } + } + case MapType(keyType, valueType, _) => + val expectedKeyArray = expected.asInstanceOf[MapData].keyArray() + val expectedValueArray = expected.asInstanceOf[MapData].valueArray() + val actualKeyArray = actual.asInstanceOf[MapData].keyArray() + val actualValueArray = actual.asInstanceOf[MapData].valueArray() + internalRowCompare(expectedKeyArray, actualKeyArray, ArrayType(keyType)) + internalRowCompare(expectedValueArray, actualValueArray, ArrayType(valueType)) + case StringType => if (checkNull(expected, actual) || !expected.toString.equals(actual.toString)) { + throw new AssertionError(String.format("%s is not equals %s", expected.toString, actual.toString)) + } + // TODO Verify after 'https://github.com/apache/hudi/pull/5907' merge + case BinaryType => if (checkNull(expected, actual) || !expected.asInstanceOf[Array[Byte]].sameElements(actual.asInstanceOf[Array[Byte]])) { + // throw new AssertionError(String.format("%s is not equals %s", expected.toString, actual.toString)) + } + case _ => if (!Objects.equals(expected, actual)) { + // throw new AssertionError(String.format("%s is not equals %s", expected.toString, actual.toString)) + } + } + } + + private def checkNull(left: Any, right: Any): Boolean = { + (left == null && right != null) || (left == null && right != null) + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index badd3ab627..dbe5285e9d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -275,6 +275,7 @@ public class DeltaSync implements Serializable { .setTableName(cfg.targetTableName) .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue()) .setPayloadClassName(cfg.payloadClassName) + .setMergeClassName(cfg.mergeClassName) .setBaseFileFormat(cfg.baseFileFormat) .setPartitionFields(partitionColumns) .setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key())) @@ -372,6 +373,7 @@ public class DeltaSync implements Serializable { .setTableName(cfg.targetTableName) .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue()) .setPayloadClassName(cfg.payloadClassName) + .setMergeClassName(cfg.mergeClassName) .setBaseFileFormat(cfg.baseFileFormat) .setPartitionFields(partitionColumns) .setRecordKeyFields(props.getProperty(DataSourceWriteOptions.RECORDKEY_FIELD().key())) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index a22a3581ae..dcf1581216 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieAvroRecordMerge; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteOperationType; @@ -270,6 +271,10 @@ public class HoodieDeltaStreamer implements Serializable { + "a GenericRecord. Implement your own, if you want to do something other than overwriting existing value") public String payloadClassName = OverwriteWithLatestAvroPayload.class.getName(); + @Parameter(names = {"--merge-class"}, description = "Implements of HoodieMerge, that defines how to merge two records." + + "Implement your own, if you want to implement specific record merge logic.") + public String mergeClassName = HoodieAvroRecordMerge.class.getName(); + @Parameter(names = {"--schemaprovider-class"}, description = "subclass of org.apache.hudi.utilities.schema" + ".SchemaProvider to attach schemas to input & target table data, built in options: " + "org.apache.hudi.utilities.schema.FilebasedSchemaProvider."
