danny0405 commented on code in PR #13444:
URL: https://github.com/apache/hudi/pull/13444#discussion_r2196395996
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -316,27 +322,57 @@ protected boolean hasNextBaseRecord(T baseRecord,
BufferedRecord<T> logRecordInf
Pair<Boolean, T> isDeleteAndRecord = merge(baseRecordInfo,
logRecordInfo);
if (!isDeleteAndRecord.getLeft()) {
// Updates
- nextRecord = readerContext.seal(isDeleteAndRecord.getRight());
+ nextRecord =
readerContext.seal(applyOutputSchemaConversion(isDeleteAndRecord.getRight()));
+ if (baseFileUpdateCallback != null && isDeleteAndRecord.getRight() !=
baseRecord) {
+ // If the record is not the same as the base record, we can emit an
update
+ handleBaseFileUpdate(logRecordInfo.getRecordKey(), baseRecord,
nextRecord);
+ }
readStats.incrementNumUpdates();
return true;
- } else if (emitDelete) {
- // emit Deletes
- nextRecord = readerContext.getDeleteRow(isDeleteAndRecord.getRight(),
baseRecordInfo.getRecordKey());
- readStats.incrementNumDeletes();
- return nextRecord != null;
} else {
- // not emit Deletes
+ // emit Deletes
+ handleBaseFileDelete(logRecordInfo.getRecordKey(), baseRecord);
readStats.incrementNumDeletes();
- return false;
+ if (emitDelete) {
+ nextRecord =
applyOutputSchemaConversion(readerContext.getDeleteRow(isDeleteAndRecord.getRight(),
baseRecordInfo.getRecordKey()));
+ return nextRecord != null;
+ } else {
+ return false;
+ }
}
}
// Inserts
- nextRecord = readerContext.seal(baseRecord);
+ nextRecord = readerContext.seal(applyOutputSchemaConversion(baseRecord));
readStats.incrementNumInserts();
return true;
}
+ /**
+ * Applies the final output schema conversion to the buffered record if
required. This ensures the records match the requested schema.
+ * @param bufferedRecord the buffered record to convert
+ * @return a new buffered record with the converted record and the proper
schema ID set
+ */
+ protected BufferedRecord<T> applyOutputSchemaConversion(BufferedRecord<T>
bufferedRecord) {
Review Comment:
I see the `applyOutputSchemaConversion` has been applied in several places
with optional condition, this is error-prone, it is okey we project the record
two times if the cost is low compared to the cdc logging IO: one for regular
write, one for cdc logging.
And actually, there should be no projection for cdc logging path at all,
because it either comes from regular COW write or MOR compaction, both does not
really have explicit query projection like query side.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]