nsivabalan commented on code in PR #13699:
URL: https://github.com/apache/hudi/pull/13699#discussion_r2279080060
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -198,8 +270,10 @@ public void doMerge() {
}
// Writes the record
try {
- writeToFile(record.getKey(), record, writeSchemaWithMetaFields,
- config.getPayloadConfig().getProps(), preserveMetadata);
+ // if the record is not being updated and is not a new insert for
the file group, we must preserve the existing record metadata.
+ boolean shouldPreserveRecordMetadata = preserveMetadata ||
record.getOperation() == null;
Review Comment:
thanks
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -526,14 +524,14 @@ public static <R> HoodieData<HoodieRecord<R>>
mergeForPartitionUpdatesIfNeeded(
.distinct(updatedConfig.getGlobalIndexReconcileParallelism());
// define the buffered record merger.
ReaderContextFactory<R> readerContextFactory = (ReaderContextFactory<R>)
hoodieTable.getContext()
- .getReaderContextFactoryForWrite(hoodieTable.getMetaClient(),
config.getRecordMerger().getRecordType(), config.getProps());
+ .getReaderContextFactoryForWrite(hoodieTable.getMetaClient(),
config.getRecordMerger().getRecordType(), config.getProps(), true);
Review Comment:
sg
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -80,17 +101,56 @@ public BufferedRecord<T> processUpdate(String recordKey,
BufferedRecord<T> previ
}
return null;
} else {
- T prevRow = previousRecord != null ? previousRecord.getRecord() : null;
- T mergedRow = mergedRecord.getRecord();
- if (prevRow != null && prevRow != mergedRow) {
- mergedRecord.setHoodieOperation(HoodieOperation.UPDATE_AFTER);
- readStats.incrementNumUpdates();
- } else if (prevRow == null) {
- mergedRecord.setHoodieOperation(HoodieOperation.INSERT);
- readStats.incrementNumInserts();
+ return handleNonDeletes(previousRecord, mergedRecord);
+ }
+ }
+
+ protected BufferedRecord<T> handleNonDeletes(BufferedRecord<T>
previousRecord, BufferedRecord<T> mergedRecord) {
+ T prevRow = previousRecord != null ? previousRecord.getRecord() : null;
+ T mergedRow = mergedRecord.getRecord();
+ if (prevRow != null && prevRow != mergedRow) {
+ mergedRecord.setHoodieOperation(HoodieOperation.UPDATE_AFTER);
+ readStats.incrementNumUpdates();
+ } else if (prevRow == null) {
+ mergedRecord.setHoodieOperation(HoodieOperation.INSERT);
+ readStats.incrementNumInserts();
+ }
+ return mergedRecord.seal(readerContext.getRecordContext());
+ }
+ }
+
+ class PayloadUpdateProcessor<T> extends StandardUpdateProcessor<T> {
Review Comment:
In the interest of time to land the patch, we can go ahead if CI is green.
But would recommend filing one follow up jira and adding all these minor
things. we can take it up during bug fixes period as well.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -251,6 +344,31 @@ public List<WriteStatus> close() {
}
}
+ private Option<BaseFileUpdateCallback<T>> createCallback() {
+ List<BaseFileUpdateCallback<T>> callbacks = new ArrayList<>();
+ // Handle CDC workflow.
+ if (cdcLogger.isPresent()) {
+ callbacks.add(new CDCCallback<>(cdcLogger.get(), readerContext));
+ }
+ // Indexes are not updated during compaction
+ if (operation.isEmpty()) {
+ // record index callback
+ if (this.writeStatus.isTrackingSuccessfulWrites()) {
+ writeStatus.manuallyTrackSuccess();
+ callbacks.add(new RecordLevelIndexCallback<>(writeStatus,
newRecordLocation, partitionPath));
+ }
+ // Stream secondary index stats.
+ if (isSecondaryIndexStatsStreamingWritesEnabled) {
+ callbacks.add(new SecondaryIndexCallback<>(
Review Comment:
I see. but it should be possible to have just 1 callback for both RLI and SI
right. I see its simpler and maintainable w/ two diff callbacks for now. So,
not really a blocking comment. we can revisit this in future if this causes any
perf issues.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -397,6 +401,9 @@ protected HoodieMergeHandle getUpdateHandle(String
partitionPath, String fileId,
mergeHandle.setPartitionFields(partitionFields);
mergeHandle.setPartitionValues(partitionValues);
}
+ if (readerContextFactory != null && mergeHandle instanceof
FileGroupReaderBasedMergeHandle) {
Review Comment:
gotcha. thanks for the context. Spark specific requirement is what makes
this tricky.
I am good then.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java:
##########
@@ -76,7 +72,7 @@ abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordBuffer<T
protected final RecordMergeMode recordMergeMode;
protected final PartialUpdateMode partialUpdateMode;
protected final Option<HoodieRecordMerger> recordMerger;
- protected final Option<String> payloadClass;
+ protected final Option<Pair<String, String>> payloadClasses;
Review Comment:
thanks.
trying to think through how our path forward will look like where we are
looking to completely move away from payloads. until we retire expression
payloads from Merge Into, we have to assume all records will have some payload
associated with it right? curious to know how are you tackling this in your
next patch. Essentially, what changes we intend to do with
ExpressionPayloadRecordMerger.
For eg, if someone creates a new table in v9 today w/o setting any payload
class (after your next patch), but just setting the merge mode. and if they
issue MIT writes, what will happen w/ ExpressionPayloadRecordMerger. Currently,
it converts both base record and incoming record to HoodieAvroRecord which is
in payload format before it can actually merge them.
may be you thought through this already or have fixed this in your draft
patch. was just curious to understand. we can jam f2f to align if need be.
note: this comment is not a blocking comment to land this patch.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -156,40 +215,53 @@ private void init(CompactionOperation operation, String
partitionPath) {
fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime,
newFilePath, hoodieTable.getStorage(),
config, writeSchemaWithMetaFields, taskContextSupplier, recordType);
} catch (IOException io) {
- LOG.error("Error in update task at commit {}", instantTime, io);
writeStatus.setGlobalError(io);
throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle
for FileId: " + fileId + " on commit "
+ instantTime + " on path " +
hoodieTable.getMetaClient().getBasePath(), io);
}
}
+ @Override
+ protected void populateIncomingRecordsMap(Iterator<HoodieRecord<T>>
newRecordsItr) {
+ // no op.
+ }
+
+ /**
+ * This is only for spark, the engine context fetched from a serialized
hoodie table is always local,
+ * overrides it to spark specific reader context.
+ */
+ public void setReaderContext(HoodieReaderContext<T> readerContext) {
+ this.readerContext = readerContext;
+ }
+
/**
* Reads the file slice of a compaction operation using a file group reader,
* by getting an iterator of the records; then writes the records to a new
base file.
*/
@Override
public void doMerge() {
+ // For non-compaction operations, the merger needs to be initialized with
the writer properties to handle cases like Merge-Into commands
+ if (operation.isEmpty()) {
+ this.readerContext.initRecordMergerForIngestion(config.getProps());
+ }
boolean usePosition =
config.getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
- Option<InternalSchema> internalSchemaOption =
SerDeHelper.fromJson(config.getInternalSchema());
- TypedProperties props = TypedProperties.copy(config.getProps());
+ Option<InternalSchema> internalSchemaOption =
SerDeHelper.fromJson(config.getInternalSchema())
+ .map(internalSchema ->
AvroSchemaEvolutionUtils.reconcileSchema(writeSchemaWithMetaFields,
internalSchema,
+
config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS)));
long maxMemoryPerCompaction =
IOUtils.getMaxMemoryPerCompaction(taskContextSupplier, config);
props.put(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(),
String.valueOf(maxMemoryPerCompaction));
- Stream<HoodieLogFile> logFiles =
operation.getDeltaFileNames().stream().map(logFileName ->
+ Option<Stream<HoodieLogFile>> logFilesStreamOpt = operation.map(op ->
op.getDeltaFileNames().stream().map(logFileName ->
new HoodieLogFile(new StoragePath(FSUtils.constructAbsolutePath(
- config.getBasePath(), operation.getPartitionPath()),
logFileName)));
+ config.getBasePath(), op.getPartitionPath()), logFileName))));
// Initializes file group reader
- try (HoodieFileGroupReader<T> fileGroupReader =
HoodieFileGroupReader.<T>newBuilder().withReaderContext(readerContext).withHoodieTableMetaClient(hoodieTable.getMetaClient())
-
.withLatestCommitTime(maxInstantTime).withPartitionPath(partitionPath).withBaseFileOption(Option.ofNullable(baseFileToMerge)).withLogFiles(logFiles)
-
.withDataSchema(writeSchemaWithMetaFields).withRequestedSchema(writeSchemaWithMetaFields).withInternalSchema(internalSchemaOption).withProps(props)
-
.withShouldUseRecordPosition(usePosition).withSortOutput(hoodieTable.requireSortedRecords())
- .withFileGroupUpdateCallback(cdcLogger.map(logger -> new
CDCCallback(logger,
readerContext))).withEnableOptimizedLogBlockScan(config.enableOptimizedLogBlocksScan()).build())
{
+ try (HoodieFileGroupReader<T> fileGroupReader =
getFileGroupReader(usePosition, internalSchemaOption, props, logFilesStreamOpt,
incomingRecordsItr)) {
// Reads the records from the file slice
try (ClosableIterator<HoodieRecord<T>> recordIterator =
fileGroupReader.getClosableHoodieRecordIterator()) {
while (recordIterator.hasNext()) {
HoodieRecord<T> record = recordIterator.next();
+ Option<Map<String, String>> recordMetadata =
getRecordMetadata(record, writeSchema, props);
Review Comment:
oh, we should avoid the event time metadata tracking for compaction commits
right. How are we ensuring that?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]