nsivabalan commented on code in PR #13600:
URL: https://github.com/apache/hudi/pull/13600#discussion_r2249160326
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java:
##########
@@ -85,9 +102,77 @@ public I combineOnCondition(
* @return Collection of HoodieRecord already be deduplicated
*/
public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int
parallelism) {
+ HoodieReaderContext<T> readerContext =
+ (HoodieReaderContext<T>)
table.getContext().<T>getReaderContextFactoryDuringWrite(table.getMetaClient(),
table.getConfig().getRecordMerger().getRecordType(),
table.getConfig().getProps())
Review Comment:
minor.
how about `getReaderContextFactoryForWrites` ?
##########
hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java:
##########
@@ -84,4 +85,12 @@ public Comparable<?> getOrderingValue() {
public byte[] getRecordBytes() {
return recordBytes;
}
+
+ @Override
+ public Option<IndexedRecord> getIndexedRecord(Schema schema, Properties
properties) throws IOException {
Review Comment:
should we move this to BaseAvroPayload
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -414,27 +420,34 @@ private static <R> Option<HoodieRecord<R>>
mergeIncomingWithExistingRecordWithEx
Schema existingSchema,
Schema writeSchemaWithMetaFields,
HoodieWriteConfig config,
- HoodieRecordMerger recordMerger,
- BaseKeyGenerator keyGenerator) throws IOException {
- Option<Pair<HoodieRecord, Schema>> mergeResult =
recordMerger.merge(existing, existingSchema,
- incoming, writeSchemaWithMetaFields, config.getProps());
- if (!mergeResult.isPresent()) {
- //the record was deleted
- return Option.empty();
+ BufferedRecordMerger<R> recordMerger,
+ BaseKeyGenerator keyGenerator,
+ RecordContext<R> incomingRecordContext,
+ RecordContext<R> existingRecordContext,
+ String[] orderingFieldNames) throws IOException {
+ Option<BufferedRecord<R>> mergeResult = merge(
+ incoming, existing, writeSchemaWithMetaFields, existingSchema,
incomingRecordContext, orderingFieldNames, recordMerger, config.getProps());
+ HoodieRecord<R> result;
+ if (mergeResult.isPresent()) {
+ if (mergeResult.get().isDelete()) {
+ //the record was deleted
+ return Option.empty();
+ }
+ result = existingRecordContext.constructHoodieRecord(mergeResult.get());
+ } else {
+ result = existing;
}
- HoodieRecord<R> result = mergeResult.get().getLeft();
if (result.getData().equals(HoodieRecord.SENTINEL)) {
//the record did not match and merge case and should not be modified
return Option.of(result);
}
//record is inserted or updated
- String partitionPath = keyGenerator.getPartitionPath((GenericRecord)
result.getData());
+ String partitionPath =
keyGenerator.getPartitionPath(existingRecordContext.convertToAvroRecord(mergeResult.get().getRecord(),
writeSchemaWithMetaFields));
Review Comment:
In AvroRecordContext.convertToAvroRecord(record, schema) impl,
the schema arg is never used only. not sure where else we are using this api
from recordContext. thought will point it out.
here, to fetch partition path, it may not matter. but if we are using it
elsewhere
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -414,27 +420,34 @@ private static <R> Option<HoodieRecord<R>>
mergeIncomingWithExistingRecordWithEx
Schema existingSchema,
Schema writeSchemaWithMetaFields,
HoodieWriteConfig config,
- HoodieRecordMerger recordMerger,
- BaseKeyGenerator keyGenerator) throws IOException {
- Option<Pair<HoodieRecord, Schema>> mergeResult =
recordMerger.merge(existing, existingSchema,
- incoming, writeSchemaWithMetaFields, config.getProps());
- if (!mergeResult.isPresent()) {
- //the record was deleted
- return Option.empty();
+ BufferedRecordMerger<R> recordMerger,
+ BaseKeyGenerator keyGenerator,
+ RecordContext<R> incomingRecordContext,
+ RecordContext<R> existingRecordContext,
+ String[] orderingFieldNames) throws IOException {
+ Option<BufferedRecord<R>> mergeResult = merge(
Review Comment:
don't we need to call `finalMerge` here?
merge() is invoking `deltaMerge` internally.
incase of dedup, I agree we can call `deltaMerge`. but for global index
purpose, shouldn't we be calling `finalMerge`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -493,10 +513,36 @@ public static <R> HoodieData<HoodieRecord<R>>
mergeForPartitionUpdatesIfNeeded(
.filter(p -> p.getRight().isPresent())
.map(p -> Pair.of(p.getRight().get().getPartitionPath(),
p.getRight().get().getFileId()))
.distinct(updatedConfig.getGlobalIndexReconcileParallelism());
+ // define the buffered record merger.
+ ReaderContextFactory<R> readerContextFactory = (ReaderContextFactory<R>)
hoodieTable.getContext()
+ .<R>getReaderContextFactoryDuringWrite(hoodieTable.getMetaClient(),
config.getRecordMerger().getRecordType(), config.getProps());
+ HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
+ RecordContext<R> incomingRecordContext = readerContext.getRecordContext();
+
incomingRecordContext.updateRecordKeyExtractor(hoodieTable.getMetaClient().getTableConfig(),
false);
+ readerContext.initRecordMerger(config.getProps());
+ // Create a reader context for the existing records. In the case of
merge-into commands, the incoming records
+ // can be using an expression payload so here we rely on the table's
configured payload class if it is required.
+ ReaderContextFactory<R> readerContextFactoryForExistingRecords =
(ReaderContextFactory<R>) hoodieTable.getContext()
+ .<R>getReaderContextFactoryDuringWrite(hoodieTable.getMetaClient(),
config.getRecordMerger().getRecordType(),
hoodieTable.getMetaClient().getTableConfig().getProps());
+ RecordContext<R> existingRecordContext =
readerContextFactoryForExistingRecords.getContext().getRecordContext();
// merged existing records with current locations being set
- HoodieData<HoodieRecord<R>> existingRecords =
getExistingRecords(globalLocations, keyGeneratorWriteConfigOpt.getLeft(),
hoodieTable);
-
- final HoodieRecordMerger recordMerger = updatedConfig.getRecordMerger();
+ HoodieData<HoodieRecord<R>> existingRecords =
+ getExistingRecords(globalLocations,
keyGeneratorWriteConfigOpt.getLeft(), hoodieTable,
readerContextFactoryForExistingRecords);
+ List<String> orderingFieldNames = getOrderingFieldNames(
+ readerContext.getMergeMode(), hoodieTable.getConfig().getProps(),
hoodieTable.getMetaClient());
+ RecordMergeMode recordMergeMode =
HoodieTableConfig.inferCorrectMergingBehavior(null, config.getPayloadClass(),
null,
+ String.join(",", orderingFieldNames),
hoodieTable.getMetaClient().getTableConfig().getTableVersion()).getLeft();
+ BufferedRecordMerger<R> recordMerger = BufferedRecordMergerFactory.create(
+ readerContext,
+ recordMergeMode,
+ false,
Review Comment:
we cannot set `enablePartialMerging` to false blindly here right?
what in case, this is part of MIT command w/ partial encoding.
So, incoming could have partial set of columns, while existing record could
contain all data cols.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java:
##########
@@ -85,9 +102,77 @@ public I combineOnCondition(
* @return Collection of HoodieRecord already be deduplicated
*/
public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int
parallelism) {
+ HoodieReaderContext<T> readerContext =
+ (HoodieReaderContext<T>)
table.getContext().<T>getReaderContextFactoryDuringWrite(table.getMetaClient(),
table.getConfig().getRecordMerger().getRecordType(),
table.getConfig().getProps())
+ .getContext();
+
readerContext.getRecordContext().updateRecordKeyExtractor(table.getMetaClient().getTableConfig(),
false);
+ HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+ readerContext.initRecordMerger(table.getConfig().getProps());
+ List<String> orderingFieldNames =
HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(),
table.getConfig().getProps(), table.getMetaClient());
HoodieRecordMerger recordMerger =
HoodieRecordUtils.mergerToPreCombineMode(table.getConfig().getRecordMerger());
- return deduplicateRecords(records, table.getIndex(), parallelism,
table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
+ RecordMergeMode recordMergeMode =
HoodieTableConfig.inferCorrectMergingBehavior(null,
table.getConfig().getPayloadClass(), null,
Review Comment:
why merge mode is set to null?
shdn't we fetch from table config and set it here?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -445,40 +458,47 @@ private static <R> Option<HoodieRecord<R>>
mergeIncomingWithExistingRecord(
HoodieRecord<R> existing,
Schema writeSchema,
HoodieWriteConfig config,
- HoodieRecordMerger recordMerger,
- Option<BaseKeyGenerator> expressionPayloadKeygen) throws IOException {
+ BufferedRecordMerger<R> recordMerger,
+ Option<BaseKeyGenerator> expressionPayloadKeygen,
+ RecordContext<R> incomingRecordContext,
+ RecordContext<R> existingRecordContext,
+ String[] orderingFieldNames) throws IOException {
Schema existingSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()),
config.allowOperationMetadataField());
Schema writeSchemaWithMetaFields =
HoodieAvroUtils.addMetadataFields(writeSchema,
config.allowOperationMetadataField());
if (expressionPayloadKeygen.isPresent()) {
return mergeIncomingWithExistingRecordWithExpressionPayload(incoming,
existing, writeSchema,
- existingSchema, writeSchemaWithMetaFields, config, recordMerger,
expressionPayloadKeygen.get());
+ existingSchema, writeSchemaWithMetaFields, config, recordMerger,
expressionPayloadKeygen.get(), incomingRecordContext, existingRecordContext,
orderingFieldNames);
} else {
// prepend the hoodie meta fields as the incoming record does not have
them
HoodieRecord incomingPrepended = incoming
.prependMetaFields(writeSchema, writeSchemaWithMetaFields, new
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()),
config.getProps());
- // after prepend the meta fields, convert the record back to the
original payload
- HoodieRecord incomingWithMetaFields = incomingPrepended
- .wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields,
config.getProps(), Option.empty(), config.allowOperationMetadataField(),
Option.empty(), false, Option.empty());
- Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger
- .merge(existing, existingSchema, incomingWithMetaFields,
writeSchemaWithMetaFields, config.getProps());
+ Option<BufferedRecord<R>> mergeResult = merge(
Review Comment:
I see you are adding support to recordContext to fetch record key for
incoming records using
```
incomingRecordContext.updateRecordKeyExtractor(hoodieTable.getMetaClient().getTableConfig(),
false);
```
below.
can we do the same here.
this prepending is rewriting entire data w/ new schema right.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]