This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b9ae7e00c3 [HUDI-7177] Test custom merger for MOR tables (#10302)
0b9ae7e00c3 is described below

commit 0b9ae7e00c3c1b4919a855ac9646e8a5f6448a5d
Author: Lin Liu <[email protected]>
AuthorDate: Wed Dec 20 17:47:29 2023 -0800

    [HUDI-7177] Test custom merger for MOR tables (#10302)
---
 .../read/HoodieBaseFileGroupRecordBuffer.java      |   4 +-
 .../hudi/common/table/read/TestCustomMerger.java   | 236 +++++++++++++++++++++
 .../common/table/read/TestEventTimeMerging.java    | 177 ++++++++++++++++
 .../reader/HoodieAvroRecordTestMerger.java         |  81 +++++++
 .../reader/HoodieFileGroupReaderTestHarness.java   |   9 +-
 .../testutils/reader/HoodieRecordTestPayload.java  | 202 ++++++++++++++++++
 .../testutils/reader/HoodieTestReaderContext.java  |  16 +-
 7 files changed, 719 insertions(+), 6 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index f22e8f221f3..2f695cf0249 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -259,7 +259,9 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
           readerContext.constructHoodieRecord(older, olderInfoMap), (Schema) 
olderInfoMap.get(INTERNAL_META_SCHEMA),
           readerContext.constructHoodieRecord(newer, newerInfoMap), (Schema) 
newerInfoMap.get(INTERNAL_META_SCHEMA), payloadProps);
     }
-    if (mergedRecord.isPresent()) {
+
+    if (mergedRecord.isPresent()
+        && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), 
payloadProps)) {
       return Option.ofNullable((T) mergedRecord.get().getLeft().getData());
     }
     return Option.empty();
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
new file mode 100644
index 00000000000..3e80d4bee56
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import 
org.apache.hudi.common.testutils.reader.HoodieFileGroupReaderTestHarness;
+import org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils;
+import org.apache.hudi.common.testutils.reader.HoodieRecordTestPayload;
+import org.apache.hudi.common.testutils.reader.HoodieTestReaderContext;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.hudi.common.model.HoodieRecord.HoodieRecordType.AVRO;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.DELETE;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.INSERT;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.UPDATE;
+import static 
org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils.ROW_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestCustomMerger extends HoodieFileGroupReaderTestHarness {
+  @BeforeAll
+  public static void setUp() throws IOException {
+    // Enable our custom merger.
+    readerContext = new HoodieTestReaderContext(
+        Option.of(new CustomAvroMerger()),
+        Option.of(HoodieRecordTestPayload.class.getName()));
+
+    // -------------------------------------------------------------
+    // The test logic is as follows:
+    // 1. Base file contains 10 records,
+    //    whose key values are from 1 to 10,
+    //    whose instant time is "001" and ordering value is 2.
+    // 2. After adding the first log file,
+    //    we delete the records with keys from 1 to 5
+    //    with ordering value 3. Since the rest of records are
+    //    not through merger, they are kept as it is.
+    //    Current existing keys: [6, 7, 8, 9, 10]
+    // 3. After adding the second log file,
+    //    we tried to add the records with keys from 1 to 3 back,
+    //    and we did it since their ordering value is 4 > 3, with
+    //    the merge and flush function, only 1, 3 stay. Records with
+    //    keys from 6 to 10 are as it is.
+    //    Current existing keys: [1, 3, 6, 7, 8, 9, 10]
+    // 4. After adding the third log file,
+    //    we tried to delete records with keys from 6 to 8,
+    //    but we cannot since their ordering value is 1 < 2.
+    //    This step brings records from 6 to 8 into the merger and flush,
+    //    and only record with key 7 left.
+    //    Current existing keys: [1, 3, 7, 9, 10]
+    // 5. After adding the fourth log file,
+    //    we tried to add the records with keys from 1 to 2 back,
+    //    and it worked since their ordering value is 9.
+    //    Current existing keys: [1, 3, 7, 9]
+    // -------------------------------------------------------------
+
+    keyRanges = Arrays.asList(
+        new HoodieFileSliceTestUtils.KeyRange(1, 10),
+        new HoodieFileSliceTestUtils.KeyRange(1, 5),
+        new HoodieFileSliceTestUtils.KeyRange(1, 3),
+        new HoodieFileSliceTestUtils.KeyRange(6, 8),
+        new HoodieFileSliceTestUtils.KeyRange(1, 10));
+    timestamps = Arrays.asList(
+        2L, 3L, 4L, 1L, 9L);
+    operationTypes = Arrays.asList(
+        INSERT, DELETE, UPDATE, DELETE, UPDATE);
+    instantTimes = Arrays.asList(
+        "001", "002", "003", "004", "005");
+  }
+
+  @BeforeEach
+  public void initialize() throws Exception {
+    setTableName(TestCustomMerger.class.getName());
+    initPath(tableName);
+    initMetaClient();
+    initTestDataGenerator(new String[]{PARTITION_PATH});
+    testTable = HoodieTestTable.of(metaClient);
+    setUpMockCommits();
+  }
+
+  @Test
+  public void testWithOneLogFile() throws IOException, InterruptedException {
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(2);
+    List<String> leftKeysExpected =
+        Arrays.asList("6", "7", "8", "9", "10");
+    List<String> leftKeysActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      leftKeysActual.add(iterator.next()
+          .get(AVRO_SCHEMA.getField(ROW_KEY).pos())
+          .toString());
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+  }
+
+  @Test
+  public void testWithTwoLogFiles() throws IOException, InterruptedException {
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(3);
+    List<String> leftKeysExpected =
+        Arrays.asList("1", "3", "6", "7", "8", "9", "10");
+    List<String> leftKeysActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      leftKeysActual.add(iterator.next()
+          .get(AVRO_SCHEMA.getField(ROW_KEY).pos())
+          .toString());
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+  }
+
+  @Test
+  public void testWithThreeLogFiles() throws IOException, InterruptedException 
{
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(4);
+    List<String> leftKeysExpected =
+        Arrays.asList("1", "3", "7", "9", "10");
+    List<String> leftKeysActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      leftKeysActual.add(iterator.next()
+          .get(AVRO_SCHEMA.getField(ROW_KEY).pos())
+          .toString());
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+  }
+
+  @Test
+  public void testWithFourLogFiles() throws IOException, InterruptedException {
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(5);
+    List<String> leftKeysExpected =
+        Arrays.asList("1", "3", "5", "7", "9");
+    List<String> leftKeysActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      leftKeysActual.add(iterator.next()
+          .get(AVRO_SCHEMA.getField(ROW_KEY).pos())
+          .toString());
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+  }
+
+  /**
+   * This merger is designed to save records whose record key is odd.
+   * That means, if the record is not a delete record, and its record
+   * key is odd, then it will be output. Before the flush stage, we only
+   * flush records whose timestamp is multiple of 3; but since the write
+   * is insert operation, it will not go through the flush stage. Therefore,
+   * in this test, we solely test the merge function.
+   */
+  public static class CustomAvroMerger implements HoodieRecordMerger {
+    public static final String KEEP_CERTAIN_TIMESTAMP_VALUE_ONLY =
+        "KEEP_CERTAIN_TIMESTAMP_VALUE_ONLY";
+    public static final String TIMESTAMP = "timestamp";
+
+    @Override
+    public Option<Pair<HoodieRecord, Schema>> merge(
+        HoodieRecord older,
+        Schema oldSchema,
+        HoodieRecord newer,
+        Schema newSchema,
+        TypedProperties props
+    ) throws IOException {
+      if (newer.getOrderingValue(newSchema, props).compareTo(
+          older.getOrderingValue(oldSchema, props)) >= 0) {
+        if (newer.isDelete(newSchema, props)) {
+          return Option.empty();
+        }
+        int id = Integer.parseInt(((HoodieAvroIndexedRecord) newer)
+            .getData().get(newSchema.getField(ROW_KEY).pos()).toString());
+        if (id % 2 == 1L) {
+          return Option.of(Pair.of(newer, newSchema));
+        }
+      } else {
+        if (older.isDelete(oldSchema, props)) {
+          return Option.empty();
+        }
+        int id = Integer.parseInt(((HoodieAvroIndexedRecord) older)
+            .getData().get(oldSchema.getField(ROW_KEY).pos()).toString());
+        if (id % 2 == 1L) {
+          return Option.of(Pair.of(older, oldSchema));
+        }
+      }
+      return Option.empty();
+    }
+
+    @Override
+    public boolean shouldFlush(
+        HoodieRecord record,
+        Schema schema,
+        TypedProperties props
+    ) {
+      long timestamp = (long) ((HoodieAvroIndexedRecord) record)
+          .getData()
+          .get(schema.getField(TIMESTAMP).pos());
+      return timestamp % 3 == 0L;
+    }
+
+    @Override
+    public HoodieRecord.HoodieRecordType getRecordType() {
+      return AVRO;
+    }
+
+    @Override
+    public String getMergingStrategy() {
+      return KEEP_CERTAIN_TIMESTAMP_VALUE_ONLY;
+    }
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
new file mode 100644
index 00000000000..bf0fac19c67
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.testutils.reader.HoodieAvroRecordTestMerger;
+import 
org.apache.hudi.common.testutils.reader.HoodieFileGroupReaderTestHarness;
+import org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils;
+import org.apache.hudi.common.testutils.reader.HoodieRecordTestPayload;
+import org.apache.hudi.common.testutils.reader.HoodieTestReaderContext;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.DELETE;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.INSERT;
+import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.UPDATE;
+import static 
org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils.ROW_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestEventTimeMerging extends HoodieFileGroupReaderTestHarness {
+  @BeforeAll
+  public static void setUp() throws IOException {
+    // Create dedicated merger to avoid current delete logic holes.
+    // TODO: Unify delete logic (HUDI-7240).
+    HoodieAvroRecordMerger merger = new HoodieAvroRecordTestMerger();
+    readerContext = new HoodieTestReaderContext(
+        Option.of(merger),
+        Option.of(HoodieRecordTestPayload.class.getName()));
+
+    // -------------------------------------------------------------
+    // The test logic is as follows:
+    // 1. Base file contains 10 records,
+    //    whose key values are from 1 to 10,
+    //    whose instant time is "001" and ordering value is 2.
+    // 2. After adding the first log file,
+    //    we delete the records with keys from 1 to 5
+    //    with ordering value 3.
+    //    Current existing keys: [6, 7, 8, 9, 10]
+    // 3. After adding the second log file,
+    //    we tried to add the records with keys from 1 to 3 back,
+    //    but we cannot since their ordering value is 1 < 3.
+    //    Current existing keys: [6, 7, 8, 9, 10]
+    // 4. After adding the third log file,
+    //    we tried to delete records with keys from 6 to 8,
+    //    but we cannot since their ordering value is 1 < 2.
+    //    Current existing keys: [6, 7, 8, 9, 10]
+    // 5. After adding the fourth log file,
+    //    we tried to add the records with keys from 1 to 2 back,
+    //    and it worked since their ordering value is 4 > 3.
+    //    Current existing keys: [1, 2, 6, 7, 8, 9, 10]
+    // -------------------------------------------------------------
+
+    // Specify the key column values for each file.
+    keyRanges = Arrays.asList(
+        new HoodieFileSliceTestUtils.KeyRange(1, 10),
+        new HoodieFileSliceTestUtils.KeyRange(1, 5),
+        new HoodieFileSliceTestUtils.KeyRange(1, 3),
+        new HoodieFileSliceTestUtils.KeyRange(6, 8),
+        new HoodieFileSliceTestUtils.KeyRange(1, 2));
+    // Specify the value of `timestamp` column for each file.
+    timestamps = Arrays.asList(
+        2L, 3L, 1L, 1L, 4L);
+    // Specify the operation type for each file.
+    operationTypes = Arrays.asList(
+        INSERT, DELETE, UPDATE, DELETE, UPDATE);
+    // Specify the instant time for each file.
+    instantTimes = Arrays.asList(
+        "001", "002", "003", "004", "005");
+  }
+
+  @BeforeEach
+  public void initialize() throws Exception {
+    setTableName(TestEventTimeMerging.class.getName());
+    initPath(tableName);
+    initMetaClient();
+    initTestDataGenerator(new String[]{PARTITION_PATH});
+    testTable = HoodieTestTable.of(metaClient);
+    setUpMockCommits();
+  }
+
+  @Test
+  public void testWithOneLogFile() throws IOException, InterruptedException {
+    // The FileSlice contains a base file and a log file.
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(2);
+    List<String> leftKeysExpected = Arrays.asList("6", "7", "8", "9", "10");
+    List<Long> leftTimestampsExpected = Arrays.asList(2L, 2L, 2L, 2L, 2L);
+    List<String> leftKeysActual = new ArrayList<>();
+    List<Long> leftTimestampsActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      IndexedRecord record = iterator.next();
+      
leftKeysActual.add(record.get(AVRO_SCHEMA.getField(ROW_KEY).pos()).toString());
+      leftTimestampsActual.add((Long) 
record.get(AVRO_SCHEMA.getField("timestamp").pos()));
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+    assertEquals(leftTimestampsExpected, leftTimestampsActual);
+  }
+
+  @Test
+  public void testWithTwoLogFiles() throws IOException, InterruptedException {
+    // The FileSlice contains a base file and two log files.
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(3);
+    List<String> leftKeysExpected = Arrays.asList("6", "7", "8", "9", "10");
+    List<Long> leftTimestampsExpected = Arrays.asList(2L, 2L, 2L, 2L, 2L);
+    List<String> leftKeysActual = new ArrayList<>();
+    List<Long> leftTimestampsActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      IndexedRecord record = iterator.next();
+      
leftKeysActual.add(record.get(AVRO_SCHEMA.getField(ROW_KEY).pos()).toString());
+      leftTimestampsActual.add((Long) 
record.get(AVRO_SCHEMA.getField("timestamp").pos()));
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+    assertEquals(leftTimestampsExpected, leftTimestampsActual);
+  }
+
+  @Test
+  public void testWithThreeLogFiles() throws IOException, InterruptedException 
{
+    // The FileSlice contains a base file and three log files.
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(4);
+    List<String> leftKeysExpected = Arrays.asList("6", "7", "8", "9", "10");
+    List<Long> leftTimestampsExpected = Arrays.asList(2L, 2L, 2L, 2L, 2L);
+    List<String> leftKeysActual = new ArrayList<>();
+    List<Long> leftTimestampsActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      IndexedRecord record = iterator.next();
+      
leftKeysActual.add(record.get(AVRO_SCHEMA.getField(ROW_KEY).pos()).toString());
+      leftTimestampsActual.add((Long) 
record.get(AVRO_SCHEMA.getField("timestamp").pos()));
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+    assertEquals(leftTimestampsExpected, leftTimestampsActual);
+  }
+
+  @Test
+  public void testWithFourLogFiles() throws IOException, InterruptedException {
+    // The FileSlice contains a base file and three log files.
+    ClosableIterator<IndexedRecord> iterator = getFileGroupIterator(5);
+    List<String> leftKeysExpected = Arrays.asList("1", "2", "6", "7", "8", 
"9", "10");
+    List<Long> leftTimestampsExpected = Arrays.asList(4L, 4L, 2L, 2L, 2L, 2L, 
2L);
+    List<String> leftKeysActual = new ArrayList<>();
+    List<Long> leftTimestampsActual = new ArrayList<>();
+    while (iterator.hasNext()) {
+      IndexedRecord record = iterator.next();
+      
leftKeysActual.add(record.get(AVRO_SCHEMA.getField(ROW_KEY).pos()).toString());
+      leftTimestampsActual.add((Long) 
record.get(AVRO_SCHEMA.getField("timestamp").pos()));
+    }
+    assertEquals(leftKeysExpected, leftKeysActual);
+    assertEquals(leftTimestampsExpected, leftTimestampsActual);
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieAvroRecordTestMerger.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieAvroRecordTestMerger.java
new file mode 100644
index 00000000000..9bb69b6ceb0
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieAvroRecordTestMerger.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils.reader;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class HoodieAvroRecordTestMerger extends HoodieAvroRecordMerger {
+  @Override
+  public Option<Pair<HoodieRecord, Schema>> merge(
+      HoodieRecord older,
+      Schema oldSchema,
+      HoodieRecord newer,
+      Schema newSchema,
+      TypedProperties props
+  ) throws IOException {
+    Comparable oldOrderingVal = older.getOrderingValue(oldSchema, props);
+    Comparable newOrderingVal = newer.getOrderingValue(newSchema, props);
+
+    // The record with higher ordering value is returned.
+    if (oldOrderingVal == null || newOrderingVal.compareTo(oldOrderingVal) > 
0) {
+      return Option.of(Pair.of(newer, newSchema));
+    } else if (newOrderingVal.compareTo(oldOrderingVal) < 0) {
+      return Option.of(Pair.of(older, oldSchema));
+    }
+
+    // When their orderings are the same, we rely on the logic of the payload.
+    return combineAndGetUpdateValue(older, newer, newSchema, props)
+        .map(r -> Pair.of(new HoodieAvroIndexedRecord(r), r.getSchema()));
+  }
+
+  private Option<IndexedRecord> combineAndGetUpdateValue(
+      HoodieRecord older,
+      HoodieRecord newer,
+      Schema schema,
+      Properties props
+  ) throws IOException {
+    Option<IndexedRecord> previousAvroData = older
+        .toIndexedRecord(schema, props)
+        .map(HoodieAvroIndexedRecord::getData);
+
+    if (!previousAvroData.isPresent()) {
+      return newer
+          .toIndexedRecord(schema, props)
+          .map(HoodieAvroIndexedRecord::getData);
+    }
+
+    return ((HoodieAvroRecord) newer)
+        .getData()
+        .combineAndGetUpdateValue(
+            previousAvroData.get(), schema, props);
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
index 793a23bea51..c5dea3c675d 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.read.HoodieFileGroupReader;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.testutils.HoodieTestTable;
@@ -37,6 +38,7 @@ import org.junit.jupiter.api.AfterAll;
 import java.io.IOException;
 import java.util.List;
 
+import static 
org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultHadoopConf;
 
@@ -64,7 +66,8 @@ public class HoodieFileGroupReaderTestHarness extends 
HoodieCommonTestHarness {
     properties.setProperty(
         "hoodie.datasource.write.precombine.field", "timestamp");
     hadoopConf = getDefaultHadoopConf();
-    readerContext = new HoodieTestReaderContext(Option.empty());
+    readerContext = new HoodieTestReaderContext(
+        Option.empty(), Option.empty());
   }
 
   @AfterAll
@@ -102,6 +105,8 @@ public class HoodieFileGroupReaderTestHarness extends 
HoodieCommonTestHarness {
             FILE_ID
         );
 
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
+    tableConfig.setValue(POPULATE_META_FIELDS, "false");
     HoodieFileGroupReader<IndexedRecord> fileGroupReader =
         HoodieFileGroupReaderTestUtils.createFileGroupReader(
             fileSliceOpt,
@@ -113,7 +118,7 @@ public class HoodieFileGroupReaderTestHarness extends 
HoodieCommonTestHarness {
             Long.MAX_VALUE,
             properties,
             hadoopConf,
-            metaClient.getTableConfig(),
+            tableConfig,
             readerContext
         );
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieRecordTestPayload.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieRecordTestPayload.java
new file mode 100644
index 00000000000..bff49a025fa
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieRecordTestPayload.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils.reader;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodiePayloadProps;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.hudi.common.util.ConfigUtils.getOrderingField;
+
+public class HoodieRecordTestPayload extends OverwriteWithLatestAvroPayload {
+  public static final String METADATA_EVENT_TIME_KEY = 
"metadata.event_time.key";
+  public static final String DELETE_KEY = "hoodie.payload.delete.field";
+  public static final String DELETE_MARKER = "hoodie.payload.delete.marker";
+  private Option<Object> eventTime = Option.empty();
+  private AtomicBoolean isDeleteComputed = new AtomicBoolean(false);
+  private boolean isDefaultRecordPayloadDeleted = false;
+
+  public HoodieRecordTestPayload(GenericRecord record, Comparable orderingVal) 
{
+    super(record, orderingVal);
+  }
+
+  @Override
+  public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema, Properties properties) throws IOException {
+    // The new record is a delete record.
+    if (isDeleted(schema, properties)) {
+      String orderingField = getOrderingField(properties);
+      // If orderingField cannot be found, we can not do the compare, then use 
the natural order.
+      if (orderingField == null) {
+        return Option.empty();
+      }
+
+      // Otherwise, we compare their ordering values.
+      Comparable<?> currentOrderingVal = (Comparable<?>) 
currentValue.get(currentValue.getSchema().getField(orderingField).pos());
+      if (orderingVal.compareTo(currentOrderingVal) >= 0) {
+        return Option.empty();
+      }
+      return Option.of(currentValue);
+    }
+
+    // If the new record is not a delete record.
+    GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, 
schema);
+
+    // Null check is needed here to support schema evolution. The record in 
storage may be from old schema where
+    // the new ordering column might not be present and hence returns null.
+    if (!needUpdatingPersistedRecord(currentValue, incomingRecord, 
properties)) {
+      return Option.of(currentValue);
+    }
+
+    /*
+     * We reached a point where the value is disk is older than the incoming 
record.
+     */
+    eventTime = updateEventTime(incomingRecord, properties);
+
+    if (!isDeleteComputed.getAndSet(true)) {
+      isDefaultRecordPayloadDeleted = isDeleteRecord(incomingRecord, 
properties);
+    }
+    /*
+     * Now check if the incoming record is a delete record.
+     */
+    return isDefaultRecordPayloadDeleted ? Option.empty() : 
Option.of(incomingRecord);
+  }
+
+  @Override
+  public Option<IndexedRecord> getInsertValue(Schema schema, Properties 
properties) throws IOException {
+    if (recordBytes.length == 0) {
+      return Option.empty();
+    }
+    GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, 
schema);
+    eventTime = updateEventTime(incomingRecord, properties);
+
+    if (!isDeleteComputed.getAndSet(true)) {
+      isDefaultRecordPayloadDeleted = isDeleteRecord(incomingRecord, 
properties);
+    }
+    return isDefaultRecordPayloadDeleted ? Option.empty() : 
Option.of(incomingRecord);
+  }
+
+  public boolean isDeleted(Schema schema, Properties props) {
+    if (recordBytes.length == 0) {
+      return true;
+    }
+    try {
+      if (!isDeleteComputed.getAndSet(true)) {
+        GenericRecord incomingRecord = 
HoodieAvroUtils.bytesToAvro(recordBytes, schema);
+        isDefaultRecordPayloadDeleted = isDeleteRecord(incomingRecord, props);
+      }
+      return isDefaultRecordPayloadDeleted;
+    } catch (IOException e) {
+      throw new HoodieIOException("Deserializing bytes to avro failed ", e);
+    }
+  }
+
+  /**
+   * @param genericRecord instance of {@link GenericRecord} of interest.
+   * @param properties payload related properties
+   * @returns {@code true} if record represents a delete record. {@code false} 
otherwise.
+   */
+  protected boolean isDeleteRecord(GenericRecord genericRecord, Properties 
properties) {
+    final String deleteKey = properties.getProperty(DELETE_KEY);
+    if (StringUtils.isNullOrEmpty(deleteKey)) {
+      return isDeleteRecord(genericRecord);
+    }
+
+    
ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(properties.getProperty(DELETE_MARKER)),
+        () -> DELETE_MARKER + " should be configured with " + DELETE_KEY);
+    // Modify to be compatible with new version Avro.
+    // The new version Avro throws for GenericRecord.get if the field name
+    // does not exist in the schema.
+    if (genericRecord.getSchema().getField(deleteKey) == null) {
+      return false;
+    }
+    Object deleteMarker = genericRecord.get(deleteKey);
+    return deleteMarker != null && 
properties.getProperty(DELETE_MARKER).equals(deleteMarker.toString());
+  }
+
+  private static Option<Object> updateEventTime(GenericRecord record, 
Properties properties) {
+    boolean consistentLogicalTimestampEnabled = 
Boolean.parseBoolean(properties.getProperty(
+        
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
+        
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
+    String eventTimeField = properties
+        .getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY);
+    if (eventTimeField == null) {
+      return Option.empty();
+    }
+    return Option.ofNullable(
+        HoodieAvroUtils.getNestedFieldVal(
+            record,
+            eventTimeField,
+            true,
+            consistentLogicalTimestampEnabled)
+    );
+  }
+
+  @Override
+  public Option<Map<String, String>> getMetadata() {
+    Map<String, String> metadata = new HashMap<>();
+    if (eventTime.isPresent()) {
+      metadata.put(METADATA_EVENT_TIME_KEY, String.valueOf(eventTime.get()));
+    }
+    return metadata.isEmpty() ? Option.empty() : Option.of(metadata);
+  }
+
+  protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
+                                                IndexedRecord incomingRecord,
+                                                Properties properties) {
+    /*
+     * Combining strategy here returns currentValue on disk if incoming record 
is older.
+     * The incoming record can be either a delete (sent as an upsert with 
_hoodie_is_deleted set to true)
+     * or an insert/update record. In any case, if it is older than the record 
in disk, the currentValue
+     * in disk is returned (to be rewritten with new commit time).
+     *
+     * NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation 
type do not hit this code path
+     * and need to be dealt with separately.
+     */
+    String orderingField = getOrderingField(properties);
+    if (orderingField == null) {
+      return true;
+    }
+    boolean consistentLogicalTimestampEnabled = 
Boolean.parseBoolean(properties.getProperty(
+        
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
+        
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
+    Object persistedOrderingVal = 
HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue,
+        orderingField,
+        true, consistentLogicalTimestampEnabled);
+    Comparable incomingOrderingVal = (Comparable) 
HoodieAvroUtils.getNestedFieldVal((GenericRecord) incomingRecord,
+        orderingField,
+        true, consistentLogicalTimestampEnabled);
+    return persistedOrderingVal == null || ((Comparable) 
persistedOrderingVal).compareTo(incomingOrderingVal) <= 0;
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
index 0546396a452..ebfda308c07 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
@@ -54,9 +54,13 @@ import static 
org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils.R
 
 public class HoodieTestReaderContext extends 
HoodieReaderContext<IndexedRecord> {
   private Option<HoodieRecordMerger> customMerger;
+  private Option<String> payloadClass;
 
-  public HoodieTestReaderContext(Option<HoodieRecordMerger> customMerger) {
+  public HoodieTestReaderContext(
+      Option<HoodieRecordMerger> customMerger,
+      Option<String> payloadClass) {
     this.customMerger = customMerger;
+    this.payloadClass = payloadClass;
   }
 
   @Override
@@ -134,12 +138,16 @@ public class HoodieTestReaderContext extends 
HoodieReaderContext<IndexedRecord>
       Option<IndexedRecord> recordOpt,
       Map<String, Object> metadataMap
   ) {
+    String appliedPayloadClass =
+        payloadClass.isPresent()
+            ? payloadClass.get()
+            : DefaultHoodieRecordPayload.class.getName();
     if (!recordOpt.isPresent()) {
       return SpillableMapUtils.generateEmptyPayload(
           (String) metadataMap.get(INTERNAL_META_RECORD_KEY),
           (String) metadataMap.get(INTERNAL_META_PARTITION_PATH),
           (Comparable<?>) metadataMap.get(INTERNAL_META_ORDERING_FIELD),
-          DefaultHoodieRecordPayload.class.getName());
+          appliedPayloadClass);
     }
     return new HoodieAvroIndexedRecord(recordOpt.get());
   }
@@ -155,7 +163,9 @@ public class HoodieTestReaderContext extends 
HoodieReaderContext<IndexedRecord>
   }
 
   @Override
-  public ClosableIterator<IndexedRecord> 
mergeBootstrapReaders(ClosableIterator<IndexedRecord> skeletonFileIterator, 
ClosableIterator<IndexedRecord> dataFileIterator) {
+  public ClosableIterator<IndexedRecord> mergeBootstrapReaders(
+      ClosableIterator<IndexedRecord> skeletonFileIterator,
+      ClosableIterator<IndexedRecord> dataFileIterator) {
     return null;
   }
 


Reply via email to