the-other-tim-brown commented on code in PR #13600:
URL: https://github.com/apache/hudi/pull/13600#discussion_r2252938946
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -444,41 +459,46 @@ private static <R> Option<HoodieRecord<R>>
mergeIncomingWithExistingRecord(
HoodieRecord<R> incoming,
HoodieRecord<R> existing,
Schema writeSchema,
+ Schema writeSchemaWithMetaFields,
HoodieWriteConfig config,
- HoodieRecordMerger recordMerger,
- Option<BaseKeyGenerator> expressionPayloadKeygen) throws IOException {
- Schema existingSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()),
config.allowOperationMetadataField());
- Schema writeSchemaWithMetaFields =
HoodieAvroUtils.addMetadataFields(writeSchema,
config.allowOperationMetadataField());
+ BufferedRecordMerger<R> recordMerger,
+ Option<BaseKeyGenerator> expressionPayloadKeygen,
+ RecordContext<R> incomingRecordContext,
+ RecordContext<R> existingRecordContext,
+ String[] orderingFieldNames) throws IOException {
if (expressionPayloadKeygen.isPresent()) {
return mergeIncomingWithExistingRecordWithExpressionPayload(incoming,
existing, writeSchema,
- existingSchema, writeSchemaWithMetaFields, config, recordMerger,
expressionPayloadKeygen.get());
+ writeSchemaWithMetaFields, config, recordMerger,
expressionPayloadKeygen.get(), incomingRecordContext, existingRecordContext,
orderingFieldNames);
} else {
// prepend the hoodie meta fields as the incoming record does not have
them
HoodieRecord incomingPrepended = incoming
.prependMetaFields(writeSchema, writeSchemaWithMetaFields, new
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()),
config.getProps());
- // after prepend the meta fields, convert the record back to the
original payload
- HoodieRecord incomingWithMetaFields = incomingPrepended
- .wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields,
config.getProps(), Option.empty(), config.allowOperationMetadataField(),
Option.empty(), false, Option.empty());
- Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger
- .merge(existing, existingSchema, incomingWithMetaFields,
writeSchemaWithMetaFields, config.getProps());
- if (mergeResult.isPresent()) {
- // the merged record needs to be converted back to the original payload
- HoodieRecord<R> merged =
mergeResult.get().getLeft().wrapIntoHoodieRecordPayloadWithParams(
- writeSchemaWithMetaFields, config.getProps(), Option.empty(),
- config.allowOperationMetadataField(), Option.empty(), false,
Option.of(writeSchema));
- return Option.of(merged);
- } else {
+ BufferedRecord<R> incomingBufferedRecord =
BufferedRecord.forRecordWithContext(incomingPrepended,
writeSchemaWithMetaFields, incomingRecordContext, config.getProps(),
orderingFieldNames);
+ BufferedRecord<R> existingBufferedRecord =
BufferedRecord.forRecordWithContext(existing, writeSchemaWithMetaFields,
existingRecordContext, config.getProps(), orderingFieldNames);
+ MergeResult<R> mergeResult =
recordMerger.finalMerge(existingBufferedRecord, incomingBufferedRecord);
+
+ if (mergeResult.isDelete()) {
+ // the record was deleted
return Option.empty();
}
+ BufferedRecord<R> resultingBufferedRecord =
BufferedRecord.forRecordWithContext(mergeResult.getMergedRecord(),
writeSchemaWithMetaFields, existingRecordContext,
+ orderingFieldNames, existing.getRecordKey(),
mergeResult.getMergedRecord() == null);
+ HoodieRecord<R> result =
existingRecordContext.constructHoodieRecord(resultingBufferedRecord);
Review Comment:
The `existingRecordContext` has the proper payload class specified for the
output since the `incomingRecordContext` can have the `ExpressionPayload`. The
line is required right now. It converts the value back to a record without the
meta-fields set. When I tried to remove this, the tests failed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]