nsivabalan commented on code in PR #13623:
URL: https://github.com/apache/hudi/pull/13623#discussion_r2231191870
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateStrategy.java:
##########
@@ -63,44 +68,82 @@ BufferedRecord<T> partialMerge(BufferedRecord<T> newRecord,
BufferedRecord<T> oldRecord,
Schema newSchema,
Schema oldSchema,
- boolean keepOldMetadataColumns) {
+ Schema readerSchema,
+ boolean keepOldMetadataColumns,
+ TypedProperties props) {
// Note that: When either newRecord or oldRecord is a delete record,
// skip partial update since delete records do not have
meaningful columns.
- if (partialUpdateMode == PartialUpdateMode.NONE
- || null == oldRecord
+ if (null == oldRecord
|| newRecord.isDelete()
|| oldRecord.isDelete()) {
return newRecord;
}
switch (partialUpdateMode) {
case KEEP_VALUES:
- case FILL_DEFAULTS:
- return newRecord;
+ return reconcileBasedOnKeepValues(newRecord, oldRecord, newSchema,
oldSchema, readerSchema, props);
case IGNORE_DEFAULTS:
return reconcileDefaultValues(
- newRecord, oldRecord, newSchema, oldSchema,
keepOldMetadataColumns);
- case IGNORE_MARKERS:
+ newRecord, oldRecord, newSchema, oldSchema,
keepOldMetadataColumns, false);
+ case IGNORE_DEFAULTS_NULLS:
+ return reconcileDefaultValues(newRecord, oldRecord, newSchema,
oldSchema, keepOldMetadataColumns, true);
+ case FILL_UNAVAILABLE:
return reconcileMarkerValues(
newRecord, oldRecord, newSchema, oldSchema);
default:
- return newRecord;
+ throw new HoodieIOException("Unsupported PartialUpdateMode " +
partialUpdateMode + " detected");
}
}
+ BufferedRecord<T> reconcileBasedOnKeepValues(BufferedRecord<T> newRecord,
+ BufferedRecord<T> oldRecord,
+ Schema newSchema,
+ Schema oldSchema,
+ Schema readerSchema,
+ TypedProperties props) {
+
+ // Merge and store the combined record
+ Option<Pair<BufferedRecord, Schema>> deleteHandlingResult =
handleDeletes(oldRecord, oldSchema, newRecord, newSchema, props);
+ if (deleteHandlingResult != null) {
+ return newRecord; // if deleted, return newRecord. newRecord.isDelete()
will return anyways.
+ }
+
+ return (BufferedRecord<T>)
keepValuesPartialMergingUtils.mergePartialRecords(oldRecord, oldSchema,
newRecord, newSchema, readerSchema, readerContext).getLeft();
+ }
+
+ /**
+ * Basic handling of deletes that is used by many of the spark mergers
+ * returns null if merger specific logic should be used
+ */
+ protected Option<Pair<BufferedRecord, Schema>> handleDeletes(BufferedRecord
older, Schema oldSchema, BufferedRecord newer, Schema newSchema,
TypedProperties props) {
+
+ if (newer.isDelete()) {
+ // Delete record
+ return Option.empty();
+ }
+
+ // old record
+ if (older.isDelete()) {
+ return Option.of(Pair.of(newer, newSchema));
+ }
+ return null;
+ }
+
/**
* @param newRecord The newer record determined by the merge
mode.
* @param oldRecord The older record determined by the merge
mode.
* @param newSchema The schema of the newer record.
* @param oldSchema The schema of the older record.
* @param keepOldMetadataColumns Keep the metadata columns from the older
record.
+ * @param includeNulls true if nulls are expected to be ignored as
well. false otherwise.
* @return
*/
BufferedRecord<T> reconcileDefaultValues(BufferedRecord<T> newRecord,
BufferedRecord<T> oldRecord,
Schema newSchema,
Schema oldSchema,
- boolean keepOldMetadataColumns) {
+ boolean keepOldMetadataColumns,
+ boolean includeNulls) {
Review Comment:
as commented above, its not yet fully integrated as the other PR is still
open.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]