the-other-tim-brown commented on code in PR #13699:
URL: https://github.com/apache/hudi/pull/13699#discussion_r2266753079
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -75,13 +85,60 @@
public class FileGroupReaderBasedMergeHandle<T, I, K, O> extends
HoodieWriteMergeHandle<T, I, K, O> {
private static final Logger LOG =
LoggerFactory.getLogger(FileGroupReaderBasedMergeHandle.class);
- private final HoodieReaderContext<T> readerContext;
- private final CompactionOperation operation;
+ private final Option<CompactionOperation> operation;
private final String maxInstantTime;
+ private HoodieReaderContext<T> readerContext;
private HoodieReadStats readStats;
- private final HoodieRecord.HoodieRecordType recordType;
- private final Option<HoodieCDCLogger> cdcLogger;
+ private HoodieRecord.HoodieRecordType recordType;
+ private Option<HoodieCDCLogger> cdcLogger;
+ private Option<RecordLevelIndexCallback> recordIndexCallbackOpt;
+ private Option<SecondaryIndexCallback> secondaryIndexCallbackOpt;
+ private final boolean isCompaction;
+ private final TypedProperties props;
+ private Iterator<HoodieRecord> incomingRecordsItr;
+ /**
+ * Constructor for Copy-On-Write (COW) merge path.
+ * Takes in a base path and an iterator of records to be merged with that
file.
+ * @param config instance of {@link HoodieWriteConfig} to use.
+ * @param instantTime instant time of the current commit.
+ * @param hoodieTable instance of {@link HoodieTable} being updated.
+ * @param recordItr iterator of records to be merged with the file.
+ * @param partitionPath partition path of the base file.
+ * @param fileId file ID of the base file.
+ * @param taskContextSupplier instance of {@link TaskContextSupplier} to use.
+ * @param keyGeneratorOpt optional instance of {@link BaseKeyGenerator} to
use for extracting keys from records.
+ * @param readerContext instance of {@link HoodieReaderContext} to use while
merging for accessing fields and transforming records.
+ */
+ public FileGroupReaderBasedMergeHandle(HoodieWriteConfig config, String
instantTime, HoodieTable<T, I, K, O> hoodieTable,
+ Iterator<HoodieRecord<T>> recordItr,
String partitionPath, String fileId,
+ TaskContextSupplier
taskContextSupplier, Option<BaseKeyGenerator> keyGeneratorOpt,
HoodieReaderContext<T> readerContext) {
Review Comment:
The issue is that the merge handles are created on the executors in spark so
the `hoodieTable.getContext` will always return a local engine context instead
of a spark engine context when required.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -267,26 +393,141 @@ private static class CDCCallback<T> implements
BaseFileUpdateCallback<T> {
}
@Override
- public void onUpdate(String recordKey, T previousRecord, T mergedRecord) {
+ public void onUpdate(String recordKey, BufferedRecord<T> previousRecord,
BufferedRecord<T> mergedRecord) {
cdcLogger.put(recordKey, convertOutput(previousRecord),
Option.of(convertOutput(mergedRecord)));
-
}
@Override
- public void onInsert(String recordKey, T newRecord) {
+ public void onInsert(String recordKey, BufferedRecord<T> newRecord) {
cdcLogger.put(recordKey, null, Option.of(convertOutput(newRecord)));
-
}
@Override
- public void onDelete(String recordKey, T previousRecord) {
+ public void onDelete(String recordKey, BufferedRecord<T> previousRecord,
HoodieOperation hoodieOperation) {
cdcLogger.put(recordKey, convertOutput(previousRecord), Option.empty());
-
}
- private GenericRecord convertOutput(T record) {
- T convertedRecord = outputConverter.get().map(converter -> record ==
null ? null : converter.apply(record)).orElse(record);
+ private GenericRecord convertOutput(BufferedRecord<T> record) {
+ T convertedRecord = outputConverter.get().map(converter -> record ==
null ? null : converter.apply(record.getRecord())).orElse(record.getRecord());
return convertedRecord == null ? null :
readerContext.getRecordContext().convertToAvroRecord(convertedRecord,
requestedSchema.get());
}
}
+
+ private static class RecordLevelIndexCallback<T> implements
BaseFileUpdateCallback<T> {
+ private final WriteStatus writeStatus;
+ private final HoodieRecordLocation fileRecordLocation;
+ private final String partitionPath;
+
+ public RecordLevelIndexCallback(WriteStatus writeStatus,
HoodieRecordLocation fileRecordLocation, String partitionPath) {
+ this.writeStatus = writeStatus;
+ this.fileRecordLocation = fileRecordLocation;
+ this.partitionPath = partitionPath;
+ }
+
+ @Override
+ public void onUpdate(String recordKey, BufferedRecord<T> previousRecord,
BufferedRecord<T> mergedRecord) {
+ writeStatus.addRecordDelegate(HoodieRecordDelegate.create(recordKey,
partitionPath, fileRecordLocation, fileRecordLocation,
mergedRecord.getHoodieOperation() == HoodieOperation.UPDATE_BEFORE));
+ }
+
+ @Override
+ public void onInsert(String recordKey, BufferedRecord<T> newRecord) {
+ writeStatus.addRecordDelegate(HoodieRecordDelegate.create(recordKey,
partitionPath, null, fileRecordLocation, newRecord.getHoodieOperation() ==
HoodieOperation.UPDATE_BEFORE));
Review Comment:
It's always false today but do we want to keep this in case there is some
future case where it may not be the case?
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java:
##########
@@ -139,15 +139,16 @@ public abstract <I, K, V> List<V> reduceByKey(
/**
* Returns reader context factory for write operations in the table.
*
- * @param metaClient Table meta client
- * @param recordType Record type
- * @param properties Typed properties
+ * @param metaClient Table meta client
+ * @param recordType Record type
+ * @param properties Typed properties
+ * @param outputsCustomPayloads Whether the reader context factory should
output custom payloads. Final merging of records before writes does not require
custom payloads.
*/
public ReaderContextFactory<?>
getReaderContextFactoryForWrite(HoodieTableMetaClient metaClient,
HoodieRecord.HoodieRecordType recordType,
-
TypedProperties properties) {
+
TypedProperties properties, boolean outputsCustomPayloads) {
Review Comment:
I didn't find a good way right now. This flag is really representing two
different stages of the writer path, the dedupe/indexing stages and the final
write. In the final write, we don't want to ever use the payload based records
since we just want the final indexed record representation of the record.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/SecondaryIndexStreamingTracker.java:
##########
@@ -226,6 +228,81 @@ static <T> void trackSecondaryIndexStats(@Nullable
HoodieKey hoodieKey, Option<H
});
}
+ /**
+ * The utility function used by Merge Handle to generate secondary index
stats for the corresponding record.
+ * It considers the new merged version of the record and compares it with
the older version of the record to generate
+ * secondary index stats.
+ *
+ * @param hoodieKey The hoodie key
+ * @param combinedRecordOpt New record merged with the old record
+ * @param oldRecord The old record
+ * @param isDelete Whether the record is a DELETE
+ * @param writeStatus The Write status
+ * @param secondaryIndexDefns Definitions for secondary index which
need to be updated
+ */
+ static <T> void trackSecondaryIndexStats(HoodieKey hoodieKey,
Option<BufferedRecord<T>> combinedRecordOpt, @Nullable BufferedRecord<T>
oldRecord, boolean isDelete,
Review Comment:
Method mirrors the one above it but operates directly on BufferedRecord
instead of converting to HoodieRecord
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -267,26 +393,141 @@ private static class CDCCallback<T> implements
BaseFileUpdateCallback<T> {
}
@Override
- public void onUpdate(String recordKey, T previousRecord, T mergedRecord) {
+ public void onUpdate(String recordKey, BufferedRecord<T> previousRecord,
BufferedRecord<T> mergedRecord) {
cdcLogger.put(recordKey, convertOutput(previousRecord),
Option.of(convertOutput(mergedRecord)));
-
}
@Override
- public void onInsert(String recordKey, T newRecord) {
+ public void onInsert(String recordKey, BufferedRecord<T> newRecord) {
cdcLogger.put(recordKey, null, Option.of(convertOutput(newRecord)));
-
}
@Override
- public void onDelete(String recordKey, T previousRecord) {
+ public void onDelete(String recordKey, BufferedRecord<T> previousRecord,
HoodieOperation hoodieOperation) {
cdcLogger.put(recordKey, convertOutput(previousRecord), Option.empty());
-
}
- private GenericRecord convertOutput(T record) {
- T convertedRecord = outputConverter.get().map(converter -> record ==
null ? null : converter.apply(record)).orElse(record);
+ private GenericRecord convertOutput(BufferedRecord<T> record) {
+ T convertedRecord = outputConverter.get().map(converter -> record ==
null ? null : converter.apply(record.getRecord())).orElse(record.getRecord());
return convertedRecord == null ? null :
readerContext.getRecordContext().convertToAvroRecord(convertedRecord,
requestedSchema.get());
}
}
+
+ private static class RecordLevelIndexCallback<T> implements
BaseFileUpdateCallback<T> {
+ private final WriteStatus writeStatus;
+ private final HoodieRecordLocation fileRecordLocation;
+ private final String partitionPath;
+
+ public RecordLevelIndexCallback(WriteStatus writeStatus,
HoodieRecordLocation fileRecordLocation, String partitionPath) {
+ this.writeStatus = writeStatus;
+ this.fileRecordLocation = fileRecordLocation;
+ this.partitionPath = partitionPath;
+ }
+
+ @Override
+ public void onUpdate(String recordKey, BufferedRecord<T> previousRecord,
BufferedRecord<T> mergedRecord) {
+ writeStatus.addRecordDelegate(HoodieRecordDelegate.create(recordKey,
partitionPath, fileRecordLocation, fileRecordLocation,
mergedRecord.getHoodieOperation() == HoodieOperation.UPDATE_BEFORE));
Review Comment:
The write status will still be updated in the current code with this record
delegate even though `ignoreIndexUpdate` is true. This is keeping parity with
the old system but I am not sure of the context for this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]