michael1991 commented on issue #7988:
URL: https://github.com/apache/hudi/issues/7988#issuecomment-1436228601

   Hi @yihua, thanks for your quickly response ! Sorry for my late comment due 
to weekends.
   My custom payload implementation seems like :
   ```java
   package com.x.y.z.payload;
   
   import org.apache.avro.Schema;
   import org.apache.avro.generic.GenericRecord;
   import org.apache.avro.generic.GenericRecordBuilder;
   import org.apache.avro.generic.IndexedRecord;
   import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
   import org.apache.hudi.common.util.Option;
   
   import java.io.IOException;
   import java.util.List;
   import java.util.Properties;
   
   import static 
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS;
   
   /**
    * Custom Payload Class to Process Incremental Log
    */
   public class CustomPayload extends OverwriteWithLatestAvroPayload {
   
       public CustomPayload(GenericRecord record, Comparable orderingVal) {
           super(record, orderingVal);
       }
   
       public CustomPayload(Option<GenericRecord> record) {
           super(record);
       }
   
       @Override
       public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema, Properties properties) throws IOException {
           Option<IndexedRecord> recordOption = getInsertValue(schema);
           if (!recordOption.isPresent()) {
               return Option.empty();
           }
   
           GenericRecord insertRecord = (GenericRecord) recordOption.get();
           GenericRecord currentRecord = (GenericRecord) currentValue;
   
           if (isDeleteRecord(insertRecord)) {
               return Option.empty();
           } else {
               final GenericRecordBuilder builder = new 
GenericRecordBuilder(schema);
               List<Schema.Field> fields = schema.getFields();
               for (Schema.Field field : fields) {
                   String fieldName = field.name();
                   Object insertFieldValue = insertRecord.get(field.name());
                   Object currentFieldValue = currentRecord.get(field.pos());
   
                   switch (fieldName) {
                       case "double_price_1":
                       case "double_price_2":
                           Double insertPrice = toDouble(insertFieldValue);
                           Double currentPrice = toDouble(currentFieldValue);
                           if (insertPrice != null && (currentPrice == null || 
insertPrice < currentPrice)) {
                               builder.set(field, insertFieldValue);
                           } else {
                               builder.set(field, currentFieldValue);
                           }
                           break;
                       case "int_flag_1":
                       case "int_flag_2":
                           int insertFlag = 
Integer.parseInt(insertFieldValue.toString());
                           int currentFlag = 
Integer.parseInt(currentFieldValue.toString());
                           builder.set(field, currentFlag == 0 && insertFlag == 
1 ? insertFieldValue : currentFieldValue);
                           break;
                       case "long_time_1":
                       case "long_time_2":
                           Long insertTime = toLong(insertFieldValue);
                           Long currentTime = toLong(currentFieldValue);
                           if (insertTime != null && (currentTime == null || 
insertTime < currentTime)) {
                               builder.set(field, insertFieldValue);
                           } else {
                               builder.set(field, currentFieldValue);
                           }
                           break;
                       default:
                           if 
(HOODIE_META_COLUMNS_NAME_TO_POS.containsKey(fieldName) || (insertFieldValue != 
null && currentFieldValue == null)) {
                               builder.set(field, insertFieldValue);
                           } else {
                               builder.set(field, currentFieldValue);
                           }
                           break;
                   }
               }
               return Option.of(builder.build());
           }
       }
   
       private Double toDouble(Object object) {
           if (object == null) {
               return null;
           } else {
               try {
                   return Double.parseDouble(object.toString());
               } catch (Exception exception) {
                   return null;
               }
           }
       }
   
       private Long toLong(Object object) {
           if (object == null) {
               return null;
           } else {
               try {
                   return Long.parseLong(object.toString());
               } catch (Exception exception) {
                   return null;
               }
           }
       }
   
   }
   
   ```
   **Sample schema** : key, ts, int_flag_1, double_price_1, long_time_1, 
int_flag_2, double_price_2, long_time_2
   flag: 0 or 1 , int type
   price: double price
   long_time: timestamp
   key: record key
   ts: pre combine column, but unnecessary based on our scenario
   
   Sample data or unit test, I couldn't share it, sorry.
   
   BTW, I saw 
[PartialUpdateAvroPayload](https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java)
 before, but we need more flexible updating on our scenario, so we try to 
implement by ourselves. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to