This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 5e713cf95fd [HUDI-8635] Support numWrites metric for compaction (#13047) 5e713cf95fd is described below commit 5e713cf95fda451bfec21a9c7ac68d2191d10ae6 Author: Lin Liu <141371752+linliu-c...@users.noreply.github.com> AuthorDate: Sat Mar 29 04:18:25 2025 -0700 [HUDI-8635] Support numWrites metric for compaction (#13047) * Support totalLogRecord metric * Add test for the compaction stats * Address comments * Address all comments * Address comments * Fix a failed test * Add support for numWrites * Address the comments * Fixing inserts with log files --------- Co-authored-by: sivabalan <n.siv...@gmail.com> --- .../table/log/BaseHoodieLogRecordReader.java | 1 + .../common/table/read/FileGroupRecordBuffer.java | 10 ++++++++++ .../table/read/HoodieFileGroupRecordBuffer.java | 5 +++++ .../read/PositionBasedFileGroupRecordBuffer.java | 18 +++++++++++++++--- .../table/action/compact/TestHoodieCompactor.java | 22 +++++++++++++++++++--- 5 files changed, 50 insertions(+), 6 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java index b74a0c9b92f..1586e3f5ccc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java @@ -392,6 +392,7 @@ public abstract class BaseHoodieLogRecordReader<T> { // Done progress = 1.0f; + totalLogRecords.set(recordBuffer.getTotalLogRecords()); } catch (IOException e) { LOG.error("Got IOException when reading log file", e); throw new HoodieIOException("IOException when reading log file ", e); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java index 38ac956e07b..15c22883716 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java @@ -99,6 +99,7 @@ public abstract class FileGroupRecordBuffer<T> implements HoodieFileGroupRecordB protected boolean enablePartialMerging = false; protected InternalSchema internalSchema; protected HoodieTableMetaClient hoodieTableMetaClient; + private long totalLogRecords = 0; protected FileGroupRecordBuffer(HoodieReaderContext<T> readerContext, HoodieTableMetaClient hoodieTableMetaClient, @@ -215,6 +216,10 @@ public abstract class FileGroupRecordBuffer<T> implements HoodieFileGroupRecordB return records.size(); } + public long getTotalLogRecords() { + return totalLogRecords; + } + @Override public Iterator<Pair<Option<T>, Map<String, Object>>> getLogRecordIterator() { return records.values().iterator(); @@ -238,6 +243,7 @@ public abstract class FileGroupRecordBuffer<T> implements HoodieFileGroupRecordB Map<String, Object> metadata, Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair) throws IOException { + totalLogRecords++; if (existingRecordMetadataPair != null) { if (enablePartialMerging) { // TODO(HUDI-7843): decouple the merging logic from the merger @@ -341,6 +347,7 @@ public abstract class FileGroupRecordBuffer<T> implements HoodieFileGroupRecordB */ protected Option<DeleteRecord> doProcessNextDeletedRecord(DeleteRecord deleteRecord, Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair) { + totalLogRecords++; if (existingRecordMetadataPair != null) { switch (recordMergeMode) { case COMMIT_TIME_ORDERING: @@ -572,7 +579,10 @@ public abstract class FileGroupRecordBuffer<T> implements HoodieFileGroupRecordB nextRecordInfo.getLeft(), nextRecordInfo.getRight()); if (resultRecord.isPresent()) { nextRecord = readerContext.seal(resultRecord.get()); + readStats.incrementNumInserts(); return true; + } else { + readStats.incrementNumDeletes(); } } return false; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java index d9ba8bcd90e..a7b9423be2e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java @@ -90,6 +90,11 @@ public interface HoodieFileGroupRecordBuffer<T> { */ int size(); + /** + * @return the total number of log records processed. + */ + long getTotalLogRecords(); + /** * @return An iterator on the log records. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java index b0e039a4b68..bd450b77d31 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java @@ -246,9 +246,21 @@ public class PositionBasedFileGroupRecordBuffer<T> extends KeyBasedFileGroupReco Map<String, Object> metadata = readerContext.generateMetadataForRecord( baseRecord, readerSchema); - Option<T> resultRecord = logRecordInfo != null - ? merge(Option.of(baseRecord), metadata, logRecordInfo.getLeft(), logRecordInfo.getRight()) - : merge(Option.empty(), Collections.emptyMap(), Option.of(baseRecord), metadata); + Option<T> resultRecord = Option.empty(); + if (logRecordInfo != null) { + resultRecord = merge( + Option.of(baseRecord), metadata, logRecordInfo.getLeft(), logRecordInfo.getRight()); + if (resultRecord.isPresent()) { + nextRecord = readerContext.seal(resultRecord.get()); + readStats.incrementNumUpdates(); + } else { + readStats.incrementNumDeletes(); + } + } else { + resultRecord = merge(Option.empty(), Collections.emptyMap(), Option.of(baseRecord), metadata); + readStats.incrementNumInserts(); + } + if (resultRecord.isPresent()) { nextRecord = readerContext.seal(resultRecord.get()); return true; diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java index ceefcdf4924..ae05f28e8f3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java @@ -247,7 +247,7 @@ public class TestHoodieCompactor extends HoodieSparkClientTestHarness { assertLogFilesNumEqualsTo(config, i); } HoodieWriteMetadata result = compact(writeClient, String.format("10%s", i)); - verifyCompaction(result); + verifyCompaction(result, 4000L); // Verify compaction.requested, compaction.completed metrics counts. assertEquals(1, getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX)); @@ -282,7 +282,7 @@ public class TestHoodieCompactor extends HoodieSparkClientTestHarness { assertLogFilesNumEqualsTo(config, 1); HoodieWriteMetadata result = compact(writeClient, writeClient.createNewInstantTime()); - verifyCompaction(result); + verifyCompaction(result, 100L); // Verify compaction.requested, compaction.completed metrics counts. assertEquals(i / 2 + 1, getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX)); @@ -470,13 +470,14 @@ public class TestHoodieCompactor extends HoodieSparkClientTestHarness { /** * Verify that all partition paths are present in the HoodieWriteMetadata result. */ - private void verifyCompaction(HoodieWriteMetadata compactionMetadata) { + private void verifyCompaction(HoodieWriteMetadata compactionMetadata, long expectedTotalLogRecords) { assertTrue(compactionMetadata.getWriteStats().isPresent()); List<HoodieWriteStat> stats = (List<HoodieWriteStat>) compactionMetadata.getWriteStats().get(); assertEquals(dataGen.getPartitionPaths().length, stats.size()); for (String partitionPath : dataGen.getPartitionPaths()) { assertTrue(stats.stream().anyMatch(stat -> stat.getPartitionPath().contentEquals(partitionPath))); } + stats.forEach(stat -> { HoodieWriteStat.RuntimeStats runtimeStats = stat.getRuntimeStats(); assertNotNull(runtimeStats); @@ -484,5 +485,20 @@ public class TestHoodieCompactor extends HoodieSparkClientTestHarness { assertTrue(runtimeStats.getTotalUpsertTime() > 0); assertTrue(runtimeStats.getTotalScanTime() > 0); }); + + // Verify the number of log records processed during the compaction. + long actualTotalLogRecords = + stats.stream().mapToLong(HoodieWriteStat::getTotalLogRecords).sum(); + assertEquals(expectedTotalLogRecords, actualTotalLogRecords); + + // Verify the number of records written during compaction. + long actualNumWritten = + stats.stream().mapToLong(HoodieWriteStat::getNumWrites).sum(); + long actualNumUpdates = + stats.stream().mapToLong(HoodieWriteStat::getNumUpdateWrites).sum(); + long actualInserts = + stats.stream().mapToLong(HoodieWriteStat::getNumInserts).sum(); + assertTrue(actualNumWritten > 0 + && actualNumWritten == actualNumUpdates + actualInserts); } }