nsivabalan commented on code in PR #13498:
URL: https://github.com/apache/hudi/pull/13498#discussion_r2173931066


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java:
##########
@@ -91,6 +92,17 @@ public Object getValue(InternalRow row, Schema schema, 
String fieldName) {
     return getFieldValueFromInternalRow(row, schema, fieldName);
   }
 
+  @Override
+  public void setValue(InternalRow row, Schema schema, String fieldName, 
Object value) {
+    Schema.Field field = schema.getField(fieldName);

Review Comment:
   I don't think your changes is handling nested fields. 
   also, if I am not wrong, we might have to use something like below
   ```
       StructType structType = getCachedSchema(recordSchema);
       scala.Option<HoodieUnsafeRowUtils.NestedFieldPath> cachedNestedFieldPath 
=
           HoodieInternalRowUtils.getCachedPosList(structType, fieldName);
   ```
   to fetch the position of a field from InternalRow. 
   
   @yihua as well 



##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##########
@@ -330,6 +340,15 @@ public boolean castToBoolean(Object value) {
     }
   }
 
+  public String castToString(Object value) {

Review Comment:
   with spark's internal row, strings are represented as Utf8String. 
   So, we should account for that as well. 
   or override this method for spark record type



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -304,6 +308,18 @@ public class HoodieTableConfig extends HoodieConfig {
       .sinceVersion("1.0.0")
       .withDocumentation("When set to true, the table can support reading and 
writing multiple base file formats.");
 
+  public static final ConfigProperty<PartialUpdateMode> PARIL_UPDATE_MODE = 
ConfigProperty

Review Comment:
   typo. 
   PARIL -> PARTIAL



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -304,6 +308,18 @@ public class HoodieTableConfig extends HoodieConfig {
       .sinceVersion("1.0.0")
       .withDocumentation("When set to true, the table can support reading and 
writing multiple base file formats.");
 
+  public static final ConfigProperty<PartialUpdateMode> PARIL_UPDATE_MODE = 
ConfigProperty
+      .key("hoodie.write.partial.update.mode")
+      .defaultValue(PartialUpdateMode.KEEP_VALUES)
+      .sinceVersion("1.1.0")
+      .withDocumentation("This property defines the merge behavior for 
standard merge mode");

Review Comment:
   ```
   This property when set, will define how two versions of the record will be 
merged together where the later contains only partial set of values and not 
entire record. 
   ```
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -269,9 +277,13 @@ protected Option<BufferedRecord<T>> 
doProcessNextDataRecord(BufferedRecord<T> ne
       } else {
         switch (recordMergeMode) {
           case COMMIT_TIME_ORDERING:
+            updatePartiallyIfNeeded(

Review Comment:
   L 247 - 276 is where we already do partial merging. 
   We should move all these logic in there. 
   or have a high level switch case for merge mode, and for each merge mode , 
we handle both full merge and partial merge based on configs. 
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -269,9 +277,13 @@ protected Option<BufferedRecord<T>> 
doProcessNextDataRecord(BufferedRecord<T> ne
       } else {
         switch (recordMergeMode) {
           case COMMIT_TIME_ORDERING:
+            updatePartiallyIfNeeded(
+                newRecord, existingRecord, readerSchema, readerSchema, 
partialUpdateMode);
             return Option.of(newRecord);
           case EVENT_TIME_ORDERING:
             if (shouldKeepNewerRecord(existingRecord, newRecord)) {
+              updatePartiallyIfNeeded(

Review Comment:
   do we know if ordering field is missing w/ partial update(new incoming 
record) and w/ EVENT_TIME_ORDERING based merge mode , whats the expected 
behavior? 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -304,6 +308,18 @@ public class HoodieTableConfig extends HoodieConfig {
       .sinceVersion("1.0.0")
       .withDocumentation("When set to true, the table can support reading and 
writing multiple base file formats.");
 
+  public static final ConfigProperty<PartialUpdateMode> PARIL_UPDATE_MODE = 
ConfigProperty
+      .key("hoodie.write.partial.update.mode")
+      .defaultValue(PartialUpdateMode.KEEP_VALUES)

Review Comment:
   do we even need a default value for this config? 
   if partial update is enabled the table property will have to set w/ the 
right value anyways. So, why bother setting default values. 
   



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java:
##########
@@ -178,6 +179,20 @@ public Object getValue(RowData record, Schema schema, 
String fieldName) {
     }
   }
 
+  @Override
+  public void setValue(RowData record, Schema schema, String fieldName, Object 
value) {
+    // Get the index of the field from Avro schema
+    Schema.Field field = schema.getField(fieldName);

Review Comment:
   are nested fields handled?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -331,6 +343,64 @@ protected Option<BufferedRecord<T>> 
doProcessNextDataRecord(BufferedRecord<T> ne
     }
   }
 
+  /**
+   * Merge records based on partial update mode.
+   * Note that {@param newRecord} refers to the record with higher commit time
+   * if COMMIT_TIME_ORDERING mode is used, or higher event time if 
EVENT_TIME_ORDERING mode us used.
+   */
+  private void updatePartiallyIfNeeded(BufferedRecord<T> newRecord,
+                                       BufferedRecord<T> oldRecord,
+                                       Schema newSchema,
+                                       Schema oldSchema,
+                                       PartialUpdateMode partialUpdateMode) {
+    if (newRecord.isDelete()) {
+      return;
+    }
+
+    List<Schema.Field> fields = newSchema.getFields();
+    switch (partialUpdateMode) {
+      case NONE:
+      case KEEP_VALUES:
+      case FILL_DEFAULTS:
+      case COLUMN_FAMILY:
+        // No-op for these modes
+        break;
+
+      case IGNORE_DEFAULTS:
+        for (Schema.Field field : fields) {

Review Comment:
   are we accounting for nested fields?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -854,11 +870,13 @@ public static RecordMergeMode 
inferRecordMergeModeFromPayloadClass(String payloa
       return null;
     }
     if (DefaultHoodieRecordPayload.class.getName().equals(payloadClassName)
-        || EventTimeAvroPayload.class.getName().equals(payloadClassName)) {
-      // DefaultHoodieRecordPayload and EventTimeAvroPayload match with 
EVENT_TIME_ORDERING.
+        || EventTimeAvroPayload.class.getName().equals(payloadClassName)

Review Comment:
   where are we accounting for table version? 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -455,8 +526,10 @@ protected Pair<Boolean, T> merge(BufferedRecord<T> 
olderRecord, BufferedRecord<T
           Comparable oldOrderingValue = olderRecord.getOrderingValue();
           if (!olderRecord.isCommitTimeOrderingDelete()
               && oldOrderingValue.compareTo(newOrderingValue) > 0) {
+            updatePartiallyIfNeeded(olderRecord, newerRecord, readerSchema, 
readerSchema, partialUpdateMode);

Review Comment:
   if not for partial updates, we end up picking entire old record as is. 
   w/ partial updates, I see here we are switching the ordering. but isn't the 
expectation from `updatePartiallyIfNeeded` method is that, first argument is 
likely fully formed record and 2nd argument contains only partial update and 
the method merges both records into newerRecord. 
   
   but if we switch the ordering, does the method impl know that newRecord is 
partial and old record is fully formed and act accordingly ? 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -127,6 +129,7 @@ private HoodieFileGroupReader(HoodieReaderContext<T> 
readerContext, HoodieStorag
     HoodieTableConfig tableConfig = hoodieTableMetaClient.getTableConfig();
     this.partitionPath = fileSlice.getPartitionPath();
     this.partitionPathFields = tableConfig.getPartitionFields();
+    this.partialUpdateMode = tableConfig.getPartialUpdateMode();

Review Comment:
    for full record merge cases, either this should be 
Option<PartialUpdateMode> or we need a NONE PartialUpdateMode. 
   if not, what would you set the value for this mode in full record merge 
use-cases.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -331,6 +343,64 @@ protected Option<BufferedRecord<T>> 
doProcessNextDataRecord(BufferedRecord<T> ne
     }
   }
 
+  /**
+   * Merge records based on partial update mode.
+   * Note that {@param newRecord} refers to the record with higher commit time
+   * if COMMIT_TIME_ORDERING mode is used, or higher event time if 
EVENT_TIME_ORDERING mode us used.
+   */
+  private void updatePartiallyIfNeeded(BufferedRecord<T> newRecord,
+                                       BufferedRecord<T> oldRecord,
+                                       Schema newSchema,
+                                       Schema oldSchema,
+                                       PartialUpdateMode partialUpdateMode) {
+    if (newRecord.isDelete()) {
+      return;
+    }
+
+    List<Schema.Field> fields = newSchema.getFields();
+    switch (partialUpdateMode) {
+      case NONE:
+      case KEEP_VALUES:
+      case FILL_DEFAULTS:
+      case COLUMN_FAMILY:
+        // No-op for these modes
+        break;
+
+      case IGNORE_DEFAULTS:
+        for (Schema.Field field : fields) {
+          String fieldName = field.name();
+          Object defaultValue = field.defaultVal();
+          Object newValue = readerContext.getValue(
+              newRecord.getRecord(), newSchema, fieldName);
+          if (defaultValue == newValue) {
+            Object oldValue = readerContext.getValue(oldRecord.getRecord(), 
oldSchema, fieldName);
+            readerContext.setValue(
+                newRecord.getRecord(), newSchema, fieldName, oldValue);
+          }
+        }
+        break;
+
+      case IGNORE_MARKERS:
+        if (partialUpdateCustomMarkerValue == null) {
+          throw new IllegalStateException(
+              "For 'IGNORE_MARKERS' mode, custom marker value must be 
defined");
+        }
+        for (Schema.Field field : fields) {
+          String fieldName = field.name();
+          Object newValue = readerContext.getValue(newRecord.getRecord(), 
newSchema, fieldName);
+          if 
(partialUpdateCustomMarkerValue.equals(readerContext.castToString(newValue))) {

Review Comment:
   we should only do this for `String` and `Bytes` datatype. check out 
https://github.com/apache/hudi/blob/33a7b845bcd30f823d026b9f19e20554fe3cf10e/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/PostgresDebeziumAvroPayload.java#L140
 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -331,6 +343,64 @@ protected Option<BufferedRecord<T>> 
doProcessNextDataRecord(BufferedRecord<T> ne
     }
   }
 
+  /**
+   * Merge records based on partial update mode.
+   * Note that {@param newRecord} refers to the record with higher commit time
+   * if COMMIT_TIME_ORDERING mode is used, or higher event time if 
EVENT_TIME_ORDERING mode us used.
+   */
+  private void updatePartiallyIfNeeded(BufferedRecord<T> newRecord,
+                                       BufferedRecord<T> oldRecord,
+                                       Schema newSchema,
+                                       Schema oldSchema,
+                                       PartialUpdateMode partialUpdateMode) {
+    if (newRecord.isDelete()) {
+      return;
+    }
+
+    List<Schema.Field> fields = newSchema.getFields();
+    switch (partialUpdateMode) {
+      case NONE:
+      case KEEP_VALUES:
+      case FILL_DEFAULTS:
+      case COLUMN_FAMILY:
+        // No-op for these modes
+        break;
+
+      case IGNORE_DEFAULTS:
+        for (Schema.Field field : fields) {
+          String fieldName = field.name();
+          Object defaultValue = field.defaultVal();

Review Comment:
   and if a field is nested, it may not have any default value right. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -269,9 +277,13 @@ protected Option<BufferedRecord<T>> 
doProcessNextDataRecord(BufferedRecord<T> ne
       } else {
         switch (recordMergeMode) {
           case COMMIT_TIME_ORDERING:
+            updatePartiallyIfNeeded(
+                newRecord, existingRecord, readerSchema, readerSchema, 
partialUpdateMode);
             return Option.of(newRecord);
           case EVENT_TIME_ORDERING:
             if (shouldKeepNewerRecord(existingRecord, newRecord)) {
+              updatePartiallyIfNeeded(

Review Comment:
   even if we are not picking new record, i.e. old record has higher ordering 
value compared to new record, is it safe to completely ignore the new incoming 
partial record. 
   or do we need to apply the merging in reverse order? 



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java:
##########
@@ -206,6 +206,18 @@ public Object getValue(ArrayWritable record, Schema 
schema, String fieldName) {
     return getFieldValueFromArrayWritable(record, schema, fieldName, 
objectInspectorCache);
   }
 
+  @Override
+  public void setValue(ArrayWritable record, Schema schema, String fieldName, 
Object value) {
+    Schema.Field field = schema.getField(fieldName);

Review Comment:
   are nested fields handled?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to