This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e713cf95fd [HUDI-8635] Support numWrites metric for compaction 
(#13047)
5e713cf95fd is described below

commit 5e713cf95fda451bfec21a9c7ac68d2191d10ae6
Author: Lin Liu <141371752+linliu-c...@users.noreply.github.com>
AuthorDate: Sat Mar 29 04:18:25 2025 -0700

    [HUDI-8635] Support numWrites metric for compaction (#13047)
    
    * Support totalLogRecord metric
    
    * Add test for the compaction stats
    
    * Address comments
    
    * Address all comments
    
    * Address comments
    
    * Fix a failed test
    
    * Add support for numWrites
    
    * Address the comments
    
    * Fixing inserts with log files
    
    ---------
    
    Co-authored-by: sivabalan <n.siv...@gmail.com>
---
 .../table/log/BaseHoodieLogRecordReader.java       |  1 +
 .../common/table/read/FileGroupRecordBuffer.java   | 10 ++++++++++
 .../table/read/HoodieFileGroupRecordBuffer.java    |  5 +++++
 .../read/PositionBasedFileGroupRecordBuffer.java   | 18 +++++++++++++++---
 .../table/action/compact/TestHoodieCompactor.java  | 22 +++++++++++++++++++---
 5 files changed, 50 insertions(+), 6 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index b74a0c9b92f..1586e3f5ccc 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -392,6 +392,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
 
       // Done
       progress = 1.0f;
+      totalLogRecords.set(recordBuffer.getTotalLogRecords());
     } catch (IOException e) {
       LOG.error("Got IOException when reading log file", e);
       throw new HoodieIOException("IOException when reading log file ", e);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
index 38ac956e07b..15c22883716 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
@@ -99,6 +99,7 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
   protected boolean enablePartialMerging = false;
   protected InternalSchema internalSchema;
   protected HoodieTableMetaClient hoodieTableMetaClient;
+  private long totalLogRecords = 0;
 
   protected FileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
                                   HoodieTableMetaClient hoodieTableMetaClient,
@@ -215,6 +216,10 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
     return records.size();
   }
 
+  public long getTotalLogRecords() {
+    return totalLogRecords;
+  }
+
   @Override
   public Iterator<Pair<Option<T>, Map<String, Object>>> getLogRecordIterator() 
{
     return records.values().iterator();
@@ -238,6 +243,7 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
                                                                                
  Map<String, Object> metadata,
                                                                                
  Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair)
       throws IOException {
+    totalLogRecords++;
     if (existingRecordMetadataPair != null) {
       if (enablePartialMerging) {
         // TODO(HUDI-7843): decouple the merging logic from the merger
@@ -341,6 +347,7 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
    */
   protected Option<DeleteRecord> doProcessNextDeletedRecord(DeleteRecord 
deleteRecord,
                                                             Pair<Option<T>, 
Map<String, Object>> existingRecordMetadataPair) {
+    totalLogRecords++;
     if (existingRecordMetadataPair != null) {
       switch (recordMergeMode) {
         case COMMIT_TIME_ORDERING:
@@ -572,7 +579,10 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
           nextRecordInfo.getLeft(), nextRecordInfo.getRight());
       if (resultRecord.isPresent()) {
         nextRecord = readerContext.seal(resultRecord.get());
+        readStats.incrementNumInserts();
         return true;
+      } else {
+        readStats.incrementNumDeletes();
       }
     }
     return false;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
index d9ba8bcd90e..a7b9423be2e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
@@ -90,6 +90,11 @@ public interface HoodieFileGroupRecordBuffer<T> {
    */
   int size();
 
+  /**
+   * @return the total number of log records processed.
+   */
+  long getTotalLogRecords();
+
   /**
    * @return An iterator on the log records.
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
index b0e039a4b68..bd450b77d31 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
@@ -246,9 +246,21 @@ public class PositionBasedFileGroupRecordBuffer<T> extends 
KeyBasedFileGroupReco
     Map<String, Object> metadata = readerContext.generateMetadataForRecord(
         baseRecord, readerSchema);
 
-    Option<T> resultRecord = logRecordInfo != null
-        ? merge(Option.of(baseRecord), metadata, logRecordInfo.getLeft(), 
logRecordInfo.getRight())
-        : merge(Option.empty(), Collections.emptyMap(), Option.of(baseRecord), 
metadata);
+    Option<T> resultRecord = Option.empty();
+    if (logRecordInfo != null) {
+      resultRecord = merge(
+          Option.of(baseRecord), metadata, logRecordInfo.getLeft(), 
logRecordInfo.getRight());
+      if (resultRecord.isPresent()) {
+        nextRecord = readerContext.seal(resultRecord.get());
+        readStats.incrementNumUpdates();
+      } else {
+        readStats.incrementNumDeletes();
+      }
+    } else {
+      resultRecord = merge(Option.empty(), Collections.emptyMap(), 
Option.of(baseRecord), metadata);
+      readStats.incrementNumInserts();
+    }
+
     if (resultRecord.isPresent()) {
       nextRecord = readerContext.seal(resultRecord.get());
       return true;
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index ceefcdf4924..ae05f28e8f3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -247,7 +247,7 @@ public class TestHoodieCompactor extends 
HoodieSparkClientTestHarness {
         assertLogFilesNumEqualsTo(config, i);
       }
       HoodieWriteMetadata result = compact(writeClient, String.format("10%s", 
i));
-      verifyCompaction(result);
+      verifyCompaction(result, 4000L);
 
       // Verify compaction.requested, compaction.completed metrics counts.
       assertEquals(1, 
getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
@@ -282,7 +282,7 @@ public class TestHoodieCompactor extends 
HoodieSparkClientTestHarness {
         assertLogFilesNumEqualsTo(config, 1);
 
         HoodieWriteMetadata result = compact(writeClient, 
writeClient.createNewInstantTime());
-        verifyCompaction(result);
+        verifyCompaction(result, 100L);
 
         // Verify compaction.requested, compaction.completed metrics counts.
         assertEquals(i / 2 + 1, 
getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
@@ -470,13 +470,14 @@ public class TestHoodieCompactor extends 
HoodieSparkClientTestHarness {
   /**
    * Verify that all partition paths are present in the HoodieWriteMetadata 
result.
    */
-  private void verifyCompaction(HoodieWriteMetadata compactionMetadata) {
+  private void verifyCompaction(HoodieWriteMetadata compactionMetadata, long 
expectedTotalLogRecords) {
     assertTrue(compactionMetadata.getWriteStats().isPresent());
     List<HoodieWriteStat> stats = (List<HoodieWriteStat>) 
compactionMetadata.getWriteStats().get();
     assertEquals(dataGen.getPartitionPaths().length, stats.size());
     for (String partitionPath : dataGen.getPartitionPaths()) {
       assertTrue(stats.stream().anyMatch(stat -> 
stat.getPartitionPath().contentEquals(partitionPath)));
     }
+
     stats.forEach(stat -> {
       HoodieWriteStat.RuntimeStats runtimeStats = stat.getRuntimeStats();
       assertNotNull(runtimeStats);
@@ -484,5 +485,20 @@ public class TestHoodieCompactor extends 
HoodieSparkClientTestHarness {
       assertTrue(runtimeStats.getTotalUpsertTime() > 0);
       assertTrue(runtimeStats.getTotalScanTime() > 0);
     });
+
+    // Verify the number of log records processed during the compaction.
+    long actualTotalLogRecords =
+        stats.stream().mapToLong(HoodieWriteStat::getTotalLogRecords).sum();
+    assertEquals(expectedTotalLogRecords, actualTotalLogRecords);
+
+    // Verify the number of records written during compaction.
+    long actualNumWritten =
+        stats.stream().mapToLong(HoodieWriteStat::getNumWrites).sum();
+    long actualNumUpdates =
+        stats.stream().mapToLong(HoodieWriteStat::getNumUpdateWrites).sum();
+    long actualInserts =
+        stats.stream().mapToLong(HoodieWriteStat::getNumInserts).sum();
+    assertTrue(actualNumWritten > 0
+        && actualNumWritten == actualNumUpdates + actualInserts);
   }
 }

Reply via email to