This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new db5e11c853cb [HUDI-7219] Add caching support for HFileBlocks (#13724)
db5e11c853cb is described below

commit db5e11c853cbfec82dea47ffff790c915ba7bde9
Author: voonhous <[email protected]>
AuthorDate: Tue Sep 16 21:47:24 2025 +0800

    [HUDI-7219] Add caching support for HFileBlocks (#13724)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../hudi/common/config/HoodieReaderConfig.java     |  28 +++
 .../apache/hudi/io/storage/HFileReaderFactory.java |  29 +++
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  30 ++-
 .../TestHoodieNativeAvroHFileReaderCaching.java    | 259 +++++++++++++++++++++
 hudi-io/pom.xml                                    |   5 +
 .../hudi/io/hfile/CachingHFileReaderImpl.java      | 145 ++++++++++++
 .../org/apache/hudi/io/hfile/HFileBlockCache.java  | 155 ++++++++++++
 .../org/apache/hudi/io/hfile/HFileReaderImpl.java  |  53 +++--
 .../apache/hudi/io/hfile/TestHFileBlockCache.java  | 178 ++++++++++++++
 .../hudi/functional/RecordLevelIndexTestBase.scala |   3 +-
 10 files changed, 855 insertions(+), 30 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
index 56935fc5d817..1ede383b0895 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
@@ -89,4 +89,32 @@ public class HoodieReaderConfig extends HoodieConfig {
       "hoodie.write.record.merge.custom.implementation.classes";
   public static final String 
RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY =
       "hoodie.datasource.write.record.merger.impls";
+
+  public static final ConfigProperty<Boolean> HFILE_BLOCK_CACHE_ENABLED = 
ConfigProperty
+      .key("hoodie.hfile.block.cache.enabled")
+      .defaultValue(false)
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("Enable HFile block-level caching for metadata files. 
This caches frequently "
+          + "accessed HFile blocks in memory to reduce I/O operations during 
metadata queries. "
+          + "Improves performance for workloads with repeated metadata access 
patterns.");
+
+  public static final ConfigProperty<Integer> HFILE_BLOCK_CACHE_SIZE = 
ConfigProperty
+      .key("hoodie.hfile.block.cache.size")
+      .defaultValue(100)
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("Maximum number of HFile blocks to cache in memory 
per metadata file reader. "
+          + "Higher values improve cache hit rates but consume more memory. "
+          + "Only effective when hfile.block.cache.enabled is true.");
+
+  public static final ConfigProperty<Integer> HFILE_BLOCK_CACHE_TTL_MINUTES = 
ConfigProperty
+      .key("hoodie.hfile.block.cache.ttl.minutes")
+      .defaultValue(60)
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("Time-to-live (TTL) in minutes for cached HFile 
blocks. Blocks are evicted "
+          + "from the cache after this duration to prevent memory leaks. "
+          + "Only effective when hfile.block.cache.enabled is true.");
+
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HFileReaderFactory.java 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HFileReaderFactory.java
index 665c38c14c62..1eb84469739a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HFileReaderFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HFileReaderFactory.java
@@ -20,13 +20,17 @@
 package org.apache.hudi.io.storage;
 
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Either;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.hash.MurmurHash;
 import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
 import org.apache.hudi.io.ByteArraySeekableDataInputStream;
 import org.apache.hudi.io.SeekableDataInputStream;
+import org.apache.hudi.io.hfile.CachingHFileReaderImpl;
 import org.apache.hudi.io.hfile.HFileReader;
 import org.apache.hudi.io.hfile.HFileReaderImpl;
 import org.apache.hudi.storage.HoodieStorage;
@@ -42,6 +46,7 @@ public class HFileReaderFactory {
 
   private final HoodieStorage storage;
   private final HoodieMetadataConfig metadataConfig;
+  private final TypedProperties properties;
   private final Either<StoragePath, byte[]> fileSource;
 
   public HFileReaderFactory(HoodieStorage storage,
@@ -49,15 +54,39 @@ public class HFileReaderFactory {
                             Either<StoragePath, byte[]> fileSource) {
     this.storage = storage;
     this.metadataConfig = 
HoodieMetadataConfig.newBuilder().withProperties(properties).build();
+    this.properties = properties;
     this.fileSource = fileSource;
   }
 
   public HFileReader createHFileReader() throws IOException {
     final long fileSize = determineFileSize();
     final SeekableDataInputStream inputStream = createInputStream(fileSize);
+    
+    if (shouldEnableBlockCaching()) {
+      int blockCacheSize = ConfigUtils.getIntWithAltKeys(
+          properties, HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE);
+      int cacheTtlMinutes = ConfigUtils.getIntWithAltKeys(
+          properties, HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES);
+      String filePath = getFilePath();
+      return new CachingHFileReaderImpl(inputStream, fileSize, filePath, 
blockCacheSize, cacheTtlMinutes);
+    }
+    
     return new HFileReaderImpl(inputStream, fileSize);
   }
 
+  private boolean shouldEnableBlockCaching() {
+    return ConfigUtils.getBooleanWithAltKeys(properties, 
HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED);
+  }
+
+  private String getFilePath() {
+    if (fileSource.isLeft()) {
+      return fileSource.asLeft().toString();
+    }
+    // For byte array content, use a hash-based identifier
+    int murmurHash = MurmurHash.getInstance().hash(fileSource.asRight());
+    return String.valueOf(murmurHash);
+  }
+
   private long determineFileSize() throws IOException {
     if (fileSource.isLeft()) {
       return storage.getPathInfo(fileSource.asLeft()).getLength();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index e8b009e3a109..ee2b4b1a41c2 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -22,7 +22,9 @@ import org.apache.hudi.avro.HoodieAvroReaderContext;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodieListData;
@@ -86,6 +88,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
@@ -506,14 +509,18 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     // If reuse is enabled and full scan is allowed for the partition, we can 
reuse the file readers for base files and the reader context for the log files.
     Map<StoragePath, HoodieAvroFileReader> baseFileReaders = 
Collections.emptyMap();
     ReusableFileGroupRecordBufferLoader<IndexedRecord> recordBufferLoader = 
null;
-    if (reuse && isFullScanAllowedForPartition(fileSlice.getPartitionPath())) {
+    boolean shouldReuse = reuse && 
isFullScanAllowedForPartition(fileSlice.getPartitionPath());
+    if (shouldReuse) {
       Pair<HoodieAvroFileReader, 
ReusableFileGroupRecordBufferLoader<IndexedRecord>> readers =
           reusableFileReaders.computeIfAbsent(fileSlice.getFileGroupId(), fgId 
-> {
             try {
               HoodieAvroFileReader baseFileReader = null;
               if (fileSlice.getBaseFile().isPresent()) {
+                TypedProperties props = 
TypedProperties.copy(metadataConfig.getProps());
+                setHFileBlockCacheProps(props);
+                HoodieConfig newConfig = new HoodieConfig(props);
                 baseFileReader = (HoodieAvroFileReader) 
HoodieIOFactory.getIOFactory(storage).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
-                    .getFileReader(metadataConfig, 
fileSlice.getBaseFile().get().getStoragePath(), 
metadataMetaClient.getTableConfig().getBaseFileFormat(), Option.empty());
+                    .getFileReader(newConfig, 
fileSlice.getBaseFile().get().getStoragePath(), 
metadataMetaClient.getTableConfig().getBaseFileFormat(), Option.empty());
               }
               return Pair.of(baseFileReader, 
buildReusableRecordBufferLoader(fileSlice, latestMetadataInstantTime, 
instantRange));
             } catch (IOException ex) {
@@ -542,7 +549,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
         .withFileSlice(fileSlice)
         .withDataSchema(SCHEMA)
         .withRequestedSchema(SCHEMA)
-        .withProps(buildFileGroupReaderProperties(metadataConfig))
+        .withProps(buildFileGroupReaderProperties(metadataConfig, shouldReuse))
         .withRecordBufferLoader(recordBufferLoader)
         
.withEnableOptimizedLogBlockScan(metadataConfig.isOptimizedLogBlocksScanEnabled())
         .build();
@@ -855,7 +862,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
   /**
    * Derive necessary properties for FG reader.
    */
-  TypedProperties buildFileGroupReaderProperties(HoodieMetadataConfig 
metadataConfig) {
+  TypedProperties buildFileGroupReaderProperties(HoodieMetadataConfig 
metadataConfig, boolean shouldReuse) {
     HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder()
         .fromProperties(metadataConfig.getProps()).build();
     TypedProperties props = new TypedProperties();
@@ -871,6 +878,21 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     props.setProperty(
         DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
         Boolean.toString(commonConfig.isBitCaskDiskMapCompressionEnabled()));
+    if (shouldReuse) {
+      setHFileBlockCacheProps(props);
+    } else {
+      props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED.key(),
+          
metadataConfig.getStringOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED));
+    }
+    props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE.key(),
+        
metadataConfig.getStringOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE));
+    props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES.key(),
+        
metadataConfig.getStringOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES));
     return props;
   }
+
+  private void setHFileBlockCacheProps(Properties props) {
+    // Enable HFile block caching for resue and full scan usage
+    props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED.key(), 
"true");
+  }
 }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieNativeAvroHFileReaderCaching.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieNativeAvroHFileReaderCaching.java
new file mode 100644
index 000000000000..2b86245ce824
--- /dev/null
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieNativeAvroHFileReaderCaching.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.io.hadoop.HoodieAvroHFileWriter;
+import org.apache.hudi.io.hadoop.TestHoodieOrcReaderWriter;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.function.Supplier;
+
+import static 
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestHoodieNativeAvroHFileReaderCaching {
+
+  @TempDir
+  public static Path tempDir;
+
+  // Fixed seed for reproducible tests
+  private static final Random RANDOM = new Random(42);
+  private static final int KEYS_TO_LOOKUP = 10_000;
+  private static final List<String> EXISTING_KEYS = new ArrayList<>();
+  private static final List<String> MISSING_KEYS = new ArrayList<>();
+  private static HoodieStorage storage;
+
+  @BeforeAll
+  public static void setup() throws Exception {
+    storage = HoodieTestUtils.getStorage(getFilePath());
+    Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, 
"/exampleSchemaWithMetaFields.avsc");
+    HoodieAvroHFileWriter writer = createWriter(avroSchema, true);
+
+    // Write records with for realistic testing
+    final int numRecords = 50_000;
+    System.out.println("Creating HFile with " + numRecords + " records...");
+
+    for (int i = 0; i < numRecords; i++) {
+      String key = String.format("key_%08d", i);
+      EXISTING_KEYS.add(key);
+
+      GenericRecord record = new GenericData.Record(avroSchema);
+      record.put("_row_key", key);
+      record.put("time", Integer.toString(RANDOM.nextInt()));
+      record.put("number", i);
+
+      writer.writeAvroWithMetadata(
+          new HoodieAvroRecord(new HoodieKey(key, "partition_" + (i % 10)),
+              new EmptyHoodieRecordPayload()).getKey(), record);
+    }
+    writer.close();
+
+    // Generate missing keys that don't exist in the HFile
+    for (int i = 0; i < KEYS_TO_LOOKUP; i++) {
+      String missingKey = String.format("missing_key_%08d", i + numRecords);
+      MISSING_KEYS.add(missingKey);
+    }
+
+    System.out.println("HFile created with " + EXISTING_KEYS.size() + " 
existing keys");
+    System.out.println("Generated " + MISSING_KEYS.size() + " missing keys for 
testing");
+  }
+
+  @Test
+  @Disabled("Enable this for local performance tests")
+  public void testBlockCachePerformanceOnRecordLevelIndex() throws Exception {
+    System.out.println("\n=== HFile BlockCache Performance Test ===");
+
+    // Test existing keys lookup performance
+    testExistingKeysLookup();
+
+    // Test missing keys lookup performance 
+    testMissingKeysLookup();
+
+    
System.out.println("================================================================\n");
+  }
+
+  private void testExistingKeysLookup() throws Exception {
+    System.out.println("\n--- Testing " + KEYS_TO_LOOKUP + " Existing Key 
Lookups ---");
+
+    // Select 10K random existing keys
+    Collections.shuffle(EXISTING_KEYS, RANDOM);
+    List<String> testKeys = EXISTING_KEYS.subList(0, KEYS_TO_LOOKUP);
+    testKeys.sort(String::compareTo);
+
+    // Warm up JVM
+    warmupReads(testKeys.subList(0, 1000));
+
+    // Test without cache
+    long cacheTime = measureLookupTime(testKeys, true);
+    long noCacheTime = measureLookupTime(testKeys, false);
+
+    double speedup = (double) noCacheTime / cacheTime;
+
+    System.out.printf(KEYS_TO_LOOKUP + " Existing Key Lookups:\n");
+    System.out.printf("  - Without BlockCache: %d ms\n", noCacheTime);
+    System.out.printf("  - With BlockCache: %d ms\n", cacheTime);
+    System.out.printf("  - Speedup: %.2fx\n", speedup);
+    System.out.printf("  - Performance Improvement: %.1f%%\n", (speedup - 1) * 
100);
+
+    assertTrue(speedup > 1.0, "BlockCache should provide speedup for existing 
key lookups");
+  }
+
+  private void testMissingKeysLookup() throws Exception {
+    System.out.println("\n--- Testing " + KEYS_TO_LOOKUP + " Missing Key 
Lookups ---");
+
+    // Use all 1k missing keys
+    List<String> testKeys = new ArrayList<>(MISSING_KEYS);
+
+    // Warm up JVM 
+    warmupReads(testKeys.subList(0, 1000));
+
+    // Test without cache
+    long noCacheTime = measureLookupTime(testKeys, false, false);
+
+    // Test with cache  
+    long cacheTime = measureLookupTime(testKeys, true, false);
+
+    double speedup = (double) noCacheTime / cacheTime;
+
+    System.out.printf(KEYS_TO_LOOKUP + " Missing Key Lookups:\n");
+    System.out.printf("  - Without BlockCache: %d ms\n", noCacheTime);
+    System.out.printf("  - With BlockCache: %d ms\n", cacheTime);
+    System.out.printf("  - Speedup: %.2fx\n", speedup);
+    System.out.printf("  - Performance Improvement: %.1f%%\n", (speedup - 1) * 
100);
+
+    // Missing keys may not benefit as much from caching but should not be 
slower
+    assertTrue(speedup >= 0.8, "BlockCache should not significantly slow down 
missing key lookups");
+  }
+
+  private void warmupReads(List<String> keys) throws Exception {
+    // Warm up JVM to reduce noise in measurements
+    try (HoodieAvroHFileReaderImplBase reader = createReader(storage, true, 
true)) {
+      ClosableIterator<IndexedRecord> iter = 
reader.getIndexedRecordsByKeysIterator(keys, reader.getSchema());
+      // Consume all records
+      toStream(iter).forEach(record -> {
+      });
+    }
+  }
+
+  private long measureLookupTime(List<String> keys, boolean enableCache) 
throws Exception {
+    return measureLookupTime(keys, enableCache, false);
+  }
+
+  private long measureLookupTime(List<String> keys, boolean enableCache, 
boolean useBloomFilter) throws Exception {
+    // Force garbage collection before measurement
+    System.gc();
+    Thread.sleep(100); // Allow GC to complete
+
+    long startTime = System.nanoTime();
+    int totalRecordCount = 0;
+
+    // Look up each key individually to test realistic usage pattern
+
+    for (String key : keys) {
+      try (HoodieAvroHFileReaderImplBase reader = createReader(storage, 
useBloomFilter, enableCache)) {
+        List<String> singleKey = Collections.singletonList(key);
+        ClosableIterator<IndexedRecord> iter = 
reader.getIndexedRecordsByKeysIterator(singleKey, reader.getSchema());
+
+        // Count records for this key
+        while (iter.hasNext()) {
+          iter.next();
+          totalRecordCount++;
+        }
+        iter.close();
+      }
+    }
+    long endTime = System.nanoTime();
+
+    // Validate we found the expected number of records
+    boolean isExistingKeysTest = keys.size() <= EXISTING_KEYS.size()
+        && EXISTING_KEYS.containsAll(keys);
+    if (isExistingKeysTest) {
+      assertTrue(totalRecordCount > 0, "Should find existing records");
+    }
+
+    return (endTime - startTime) / 1_000_000; // Convert to milliseconds
+  }
+
+  // Helper methods
+
+  private static HoodieAvroHFileWriter createWriter(Schema avroSchema, boolean 
populateMetaFields) throws Exception {
+    String instantTime = "000";
+    HoodieStorage storage = HoodieTestUtils.getStorage(getFilePath());
+    Properties props = new Properties();
+    props.setProperty(HoodieTableConfig.POPULATE_META_FIELDS.key(), 
Boolean.toString(populateMetaFields));
+    TaskContextSupplier mockTaskContextSupplier = 
mock(TaskContextSupplier.class);
+    Supplier<Integer> partitionSupplier = mock(Supplier.class);
+    
when(mockTaskContextSupplier.getPartitionIdSupplier()).thenReturn(partitionSupplier);
+    when(partitionSupplier.get()).thenReturn(10);
+
+    return (HoodieAvroHFileWriter) HoodieFileWriterFactory.getFileWriter(
+        instantTime, getFilePath(), storage, 
HoodieStorageConfig.newBuilder().fromProperties(props).build(), avroSchema,
+        mockTaskContextSupplier, HoodieRecord.HoodieRecordType.AVRO);
+  }
+
+  private static StoragePath getFilePath() {
+    return new StoragePath(tempDir.toString() + "/perf_test.hfile");
+  }
+
+  private HoodieAvroHFileReaderImplBase createReader(HoodieStorage storage, 
boolean useBloomFilter, boolean enableCache) throws Exception {
+    TypedProperties props = new TypedProperties();
+    props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED.key(), 
String.valueOf(enableCache));
+    // Use a cache that can hold 100 blocks
+    props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE.key(), 
String.valueOf(100));
+
+    HFileReaderFactory readerFactory = HFileReaderFactory.builder()
+        .withStorage(storage)
+        .withPath(getFilePath())
+        .withProps(props)
+        .build();
+    return HoodieNativeAvroHFileReader.builder()
+        
.readerFactory(readerFactory).path(getFilePath()).useBloomFilter(useBloomFilter).build();
+  }
+}
diff --git a/hudi-io/pom.xml b/hudi-io/pom.xml
index c2a08e7e22fb..5aa7aff0c97f 100644
--- a/hudi-io/pom.xml
+++ b/hudi-io/pom.xml
@@ -135,6 +135,11 @@
       <artifactId>protobuf-java</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>com.github.ben-manes.caffeine</groupId>
+      <artifactId>caffeine</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>io.airlift</groupId>
       <artifactId>aircompressor</artifactId>
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/CachingHFileReaderImpl.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/CachingHFileReaderImpl.java
new file mode 100644
index 000000000000..3c4d4b055f8e
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/CachingHFileReaderImpl.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.io.SeekableDataInputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * HFile reader implementation with integrated caching functionality. This 
extends BaseHFileReaderImpl and overrides the block instantiation method to add 
caching capabilities.
+ * <p>
+ * Uses a shared static cache across all instances to maximize cache hits when 
multiple readers access the same file.
+ */
+public class CachingHFileReaderImpl extends HFileReaderImpl {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CachingHFileReaderImpl.class);
+
+  private static volatile HFileBlockCache GLOBAL_BLOCK_CACHE;
+  // Store first config values to check against cache config
+  private static volatile Integer INITIAL_CACHE_SIZE;
+  private static volatile Integer INITIAL_CACHE_TTL;
+  private static final Object CACHE_LOCK = new Object();
+
+  private final String filePath;
+
+  public CachingHFileReaderImpl(SeekableDataInputStream stream, long fileSize, 
String filePath, int cacheSize, int cacheTtlMinutes) {
+    super(stream, fileSize);
+    this.filePath = filePath;
+    // Initialize global cache with provided config (ignored if already 
initialized)
+    getGlobalCache(cacheSize, cacheTtlMinutes);
+  }
+
+  /**
+   * Gets or creates the global cache shared by all CachingHFileReaderImpl 
instances.
+   * Thread-safe singleton pattern with double-checked locking.
+   */
+  private static HFileBlockCache getGlobalCache(int cacheSize, int 
cacheTtlMinutes) {
+    if (GLOBAL_BLOCK_CACHE == null) {
+      synchronized (CACHE_LOCK) {
+        if (GLOBAL_BLOCK_CACHE == null) {
+          LOG.info("Initializing global HFileBlockCache with size: {}, TTL: {} 
minutes.",
+              cacheSize, cacheTtlMinutes);
+          // Store the config used for initialization
+          INITIAL_CACHE_SIZE = cacheSize;
+          INITIAL_CACHE_TTL = cacheTtlMinutes;
+          GLOBAL_BLOCK_CACHE = new HFileBlockCache(
+              cacheSize,
+              cacheTtlMinutes,
+              TimeUnit.MINUTES);
+        } else if (!INITIAL_CACHE_SIZE.equals(cacheSize) || 
!INITIAL_CACHE_TTL.equals(cacheTtlMinutes)) {
+          // Log a warning if a different config is provided after 
initialization
+          LOG.warn("HFile block cache is already initialized. The provided 
configuration is being ignored. "
+                  + "Existing config: [Size: {}, TTL: {} mins], Ignored 
config: [Size: {}, TTL: {} mins].",
+              INITIAL_CACHE_SIZE, INITIAL_CACHE_TTL,
+              cacheSize, cacheTtlMinutes);
+        }
+      }
+    }
+    return GLOBAL_BLOCK_CACHE;
+  }
+
+  @Override
+  public HFileDataBlock instantiateHFileDataBlock(BlockIndexEntry blockToRead) 
throws IOException {
+    HFileBlockCache.BlockCacheKey cacheKey = new HFileBlockCache.BlockCacheKey(
+        filePath, blockToRead.getOffset(), blockToRead.getSize());
+
+    try {
+      HFileBlock block = GLOBAL_BLOCK_CACHE.getOrCompute(cacheKey, () -> 
super.instantiateHFileDataBlock(blockToRead));
+      return (HFileDataBlock) block;
+    } catch (IOException | RuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new IOException("Failed to load HFile block", e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    // NOTE: Do not clear the shared cache when closing individual readers
+    // The cache is shared across all instances
+    super.close();
+  }
+
+  /**
+   * Gets current cache size from the global cache.
+   *
+   * @return number of cached blocks
+   */
+  public long getCacheSize() {
+    return GLOBAL_BLOCK_CACHE != null ? GLOBAL_BLOCK_CACHE.size() : 0;
+  }
+
+  /**
+   * Clears the global block cache.
+   */
+  public void clearCache() {
+    if (GLOBAL_BLOCK_CACHE != null) {
+      GLOBAL_BLOCK_CACHE.clear();
+    }
+  }
+
+  /**
+   * Gets cache statistics for monitoring optimization effectiveness.
+   *
+   * @return string representation of cache statistics
+   */
+  public String getCacheStats() {
+    return "HFileReader Cache Stats - Size: " + (GLOBAL_BLOCK_CACHE != null ? 
GLOBAL_BLOCK_CACHE.size() : 0);
+  }
+
+  /**
+   * Clears the global cache. Should only be used for testing.
+   */
+  public static void resetGlobalCache() {
+    synchronized (CACHE_LOCK) {
+      if (GLOBAL_BLOCK_CACHE != null) {
+        GLOBAL_BLOCK_CACHE.clear();
+        GLOBAL_BLOCK_CACHE = null;
+      }
+      INITIAL_CACHE_SIZE = null;
+      INITIAL_CACHE_TTL = null;
+    }
+  }
+}
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockCache.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockCache.java
new file mode 100644
index 000000000000..512358da7487
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockCache.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Least Frequently Used (LFU) cache for HFile blocks to improve read 
performance by avoiding repeated block reads.
+ * Uses Caffeine cache with configurable size and TTL. Thread-safe for 
concurrent access.
+ */
+public class HFileBlockCache {
+
+  private final Cache<BlockCacheKey, HFileBlock> cache;
+
+  public HFileBlockCache(int maxCacheSize, long expireAfterWrite, TimeUnit 
timeUnit) {
+    this.cache = Caffeine.newBuilder()
+        .maximumSize(maxCacheSize)
+        
.expireAfterAccess(Duration.ofMillis(timeUnit.toMillis(expireAfterWrite)))
+        .build();
+  }
+
+  /**
+   * Gets a block from cache.
+   *
+   * @param key the cache key
+   * @return cached block or null if not found
+   */
+  public HFileBlock getBlock(BlockCacheKey key) {
+    return cache.getIfPresent(key);
+  }
+
+  /**
+   * Puts a block into cache.
+   *
+   * @param key   the cache key
+   * @param block the block to cache
+   */
+  public void putBlock(BlockCacheKey key, HFileBlock block) {
+    cache.put(key, block);
+  }
+  
+  /**
+   * Gets a block from cache, or computes and caches it if not present.
+   *
+   * @param key      the cache key
+   * @param loader   callable to load the block if not in cache
+   * @return cached or newly computed block
+   * @throws Exception if the loader throws an exception
+   */
+  public HFileBlock getOrCompute(BlockCacheKey key, Callable<HFileBlock> 
loader) throws Exception {
+    // Caffeine uses Function instead of Callable, so we need to wrap the 
Callable
+    return cache.get(key, (k) -> {
+      try {
+        return loader.call();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
+  /**
+   * Clears all cached blocks.
+   */
+  public void clear() {
+    cache.invalidateAll();
+  }
+
+  /**
+   * Gets current cache size.
+   *
+   * @return number of cached blocks
+   */
+  public long size() {
+    return cache.estimatedSize();
+  }
+
+  /**
+   * Forces cache maintenance operations like eviction.
+   * This is useful for testing to ensure consistent behavior.
+   */
+  public void cleanUp() {
+    cache.cleanUp();
+  }
+
+  /**
+   * Cache key for identifying blocks uniquely.
+   */
+  public static class BlockCacheKey {
+
+    private final String fileIdentity;
+    private final long offset;
+    private final int size;
+
+    public BlockCacheKey(String fileIdentity, long offset, int size) {
+      this.fileIdentity = fileIdentity;
+      this.offset = offset;
+      this.size = size;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      BlockCacheKey that = (BlockCacheKey) o;
+      return offset == that.offset
+          && size == that.size
+          && Objects.equals(fileIdentity, that.fileIdentity);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = fileIdentity != null ? fileIdentity.hashCode() : 0;
+      result = 31 * result + (int) (offset ^ (offset >>> 32));
+      result = 31 * result + size;
+      return result;
+    }
+
+    @Override
+    public String toString() {
+      return "BlockCacheKey{"
+          + "fileIdentity='" + fileIdentity + '\''
+          + ", offset=" + offset
+          + ", size=" + size
+          + '}';
+    }
+  }
+}
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java
index b3be1e7d6db9..5e785bff51eb 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java
@@ -39,21 +39,22 @@ import static 
org.apache.hudi.io.hfile.HFileBlock.HFILEBLOCK_HEADER_SIZE;
 import static org.apache.hudi.io.hfile.HFileUtils.readMajorVersion;
 
 /**
- * An implementation a {@link HFileReader}.
+ * Base implementation of {@link HFileReader} without caching. This provides 
the core functionality for reading HFile format data.
  */
 public class HFileReaderImpl implements HFileReader {
-  private final SeekableDataInputStream stream;
-  private final long fileSize;
-
-  private final HFileCursor cursor;
-  private boolean isMetadataInitialized = false;
-  private HFileTrailer trailer;
-  private HFileContext context;
-  private TreeMap<Key, BlockIndexEntry> dataBlockIndexEntryMap;
-  private TreeMap<Key, BlockIndexEntry> metaBlockIndexEntryMap;
-  private HFileInfo fileInfo;
-  private Option<BlockIndexEntry> currentDataBlockEntry;
-  private Option<HFileDataBlock> currentDataBlock;
+
+  protected final SeekableDataInputStream stream;
+  protected final long fileSize;
+
+  protected final HFileCursor cursor;
+  protected boolean isMetadataInitialized = false;
+  protected HFileTrailer trailer;
+  protected HFileContext context;
+  protected TreeMap<Key, BlockIndexEntry> dataBlockIndexEntryMap;
+  protected TreeMap<Key, BlockIndexEntry> metaBlockIndexEntryMap;
+  protected HFileInfo fileInfo;
+  protected Option<BlockIndexEntry> currentDataBlockEntry;
+  protected Option<HFileDataBlock> currentDataBlock;
 
   public HFileReaderImpl(SeekableDataInputStream stream, long fileSize) {
     this.stream = stream;
@@ -149,7 +150,7 @@ public class HFileReaderImpl implements HFileReader {
         }
       }
       if (!currentDataBlockEntry.get().getNextBlockFirstKey().isPresent()) {
-        // This is the last data block.  Check against the last key.
+        // This is the last data block. Check against the last key.
         if (fileInfo.getLastKey().isPresent()) {
           int comparedLastKey = key.compareTo(fileInfo.getLastKey().get());
           if (comparedLastKey > 0) {
@@ -255,7 +256,7 @@ public class HFileReaderImpl implements HFileReader {
   public boolean isSeeked() {
     return cursor.isSeeked();
   }
-  
+
   @Override
   public void close() throws IOException {
     currentDataBlockEntry = Option.empty();
@@ -311,7 +312,14 @@ public class HFileReaderImpl implements HFileReader {
     return Option.of(keyBlockIndexEntryEntry.getValue());
   }
 
-  private HFileDataBlock instantiateHFileDataBlock(BlockIndexEntry 
blockToRead) throws IOException {
+  /**
+   * Creates an HFile data block.
+   *
+   * @param blockToRead the block index entry to read
+   * @return the instantiated HFile data block
+   * @throws IOException if there's an error reading the block
+   */
+  public HFileDataBlock instantiateHFileDataBlock(BlockIndexEntry blockToRead) 
throws IOException {
     HFileBlockReader blockReader = new HFileBlockReader(
         context, stream, blockToRead.getOffset(),
         blockToRead.getOffset() + (long) blockToRead.getSize());
@@ -326,19 +334,14 @@ public class HFileReaderImpl implements HFileReader {
   }
 
   /**
-   * Read single-level or multiple-level data block index, and load all data 
block
-   * information into memory in BFS fashion.
+   * Read single-level or multiple-level data block index, and load all data 
block information into memory in BFS fashion.
    *
-   * @param rootBlockReader a {@link HFileBlockReader} used to read root data 
index block;
-   *                        this reader will be used to read subsequent meta 
index block
-   *                        afterward
+   * @param rootBlockReader a {@link HFileBlockReader} used to read root data 
index block; this reader will be used to read subsequent meta index block 
afterward
    * @param numEntries      the number of entries in the root index block
    * @param levels          the level of the indexes
-   * @return
+   * @return single/multiple-level data block index
    */
-  private TreeMap<Key, BlockIndexEntry> readDataBlockIndex(HFileBlockReader 
rootBlockReader,
-                                                           int numEntries,
-                                                           int levels) throws 
IOException {
+  private TreeMap<Key, BlockIndexEntry> readDataBlockIndex(HFileBlockReader 
rootBlockReader, int numEntries, int levels) throws IOException {
     ValidationUtils.checkArgument(levels > 0,
         "levels of data block index must be greater than 0");
     // Parse root data index block
diff --git 
a/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileBlockCache.java 
b/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileBlockCache.java
new file mode 100644
index 000000000000..6c9c28777184
--- /dev/null
+++ b/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileBlockCache.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for HFile block caching functionality.
+ */
+public class TestHFileBlockCache {
+
+  @Test
+  public void testBlockCacheBasicOperations() {
+    HFileBlockCache cache = new HFileBlockCache(2, 30, TimeUnit.MINUTES);
+    assertEquals(0, cache.size());
+
+    // Test cache key
+    HFileBlockCache.BlockCacheKey key1 = new 
HFileBlockCache.BlockCacheKey(null, 100, 64);
+    HFileBlockCache.BlockCacheKey key2 = new 
HFileBlockCache.BlockCacheKey(null, 200, 64);
+    HFileBlockCache.BlockCacheKey key3 = new 
HFileBlockCache.BlockCacheKey(null, 300, 64);
+
+    assertNotEquals(key1, key2);
+    assertEquals(new HFileBlockCache.BlockCacheKey(null,100, 64), key1);
+
+    // Create test blocks using mock implementation with valid HFile block data
+    HFileContext context = HFileContext.builder()
+        .checksumType(ChecksumType.CRC32C)
+        .blockSize(1024)
+        .build();
+
+    // Create valid HFile block data with proper header
+    byte[] validBlockData = createValidHFileBlockData();
+    MockHFileDataBlock block1 = new MockHFileDataBlock(context, 
validBlockData, 0);
+    MockHFileDataBlock block2 = new MockHFileDataBlock(context, 
validBlockData, 0);
+    MockHFileDataBlock block3 = new MockHFileDataBlock(context, 
validBlockData, 0);
+
+    // Test put and get
+    cache.putBlock(key1, block1);
+    assertEquals(1, cache.size());
+    assertEquals(block1, cache.getBlock(key1));
+    assertNull(cache.getBlock(key2));
+
+    // Test cache limit (LFU eviction)
+    cache.putBlock(key2, block2);
+    assertEquals(2, cache.size());
+
+    cache.putBlock(key3, block3);
+
+    // Caffeine has lazy cleanup, force cache cleanup to ensure eviction has 
happened
+    cache.cleanUp();
+
+    // Test that key1 was evicted (least frequently used)
+    HFileBlock result1 = cache.getBlock(key1);
+    HFileBlock result2 = cache.getBlock(key2);
+    HFileBlock result3 = cache.getBlock(key3);
+
+    // key1 should be evicted (LFU) - it was the least frequently used
+    assertNull(result1);
+    assertSame(block2, result2);
+    assertSame(block3, result3);
+
+    // Verify final cache state - should contain at most 2 items
+    assertTrue(cache.size() <= 2, "Final cache size should not exceed maximum: 
" + cache.size());
+
+    // Test clear
+    cache.clear();
+    assertEquals(0, cache.size());
+    assertNull(cache.getBlock(key2));
+    assertNull(cache.getBlock(key3));
+  }
+
+  @Test
+  public void testGetOrComputeWithMissAndMultipleBlocks() throws Exception {
+    HFileBlockCache cache = new HFileBlockCache(10, 30, TimeUnit.MINUTES);
+    AtomicInteger loaderExecutionCount = new AtomicInteger(0);
+
+    // 0. Define keys and blocks for the test
+    HFileBlockCache.BlockCacheKey keyToCompute = new 
HFileBlockCache.BlockCacheKey("file-A", 1024, 128);
+    HFileBlockCache.BlockCacheKey preExistingKey = new 
HFileBlockCache.BlockCacheKey("file-B", 2048, 256);
+
+    HFileContext context = HFileContext.builder().build();
+    byte[] validBlockData = createValidHFileBlockData();
+    MockHFileDataBlock blockToCompute = new MockHFileDataBlock(context, 
validBlockData, 0);
+    MockHFileDataBlock preExistingBlock = new MockHFileDataBlock(context, 
validBlockData, 0);
+
+    // 1. Add a pre-existing block to ensure the cache is not empty
+    cache.putBlock(preExistingKey, preExistingBlock);
+    assertEquals(1, cache.size());
+
+    // 2. Verify a cache miss for the key we are about to compute
+    assertNull(cache.getBlock(keyToCompute), "Key should not be in the cache 
initially.");
+
+    // 3. Define the loader which increments a counter on execution
+    Callable<HFileBlock> loader = () -> {
+      loaderExecutionCount.incrementAndGet();
+      return blockToCompute;
+    };
+
+    // 4. First call to getOrCompute: should execute the loader
+    HFileBlock firstResult = cache.getOrCompute(keyToCompute, loader);
+    assertEquals(1, loaderExecutionCount.get(), "Loader should be called once 
on first access.");
+    assertEquals(2, cache.size(), "Cache size should be 2 after computing the 
new block.");
+    assertSame(blockToCompute, firstResult, "The newly computed block should 
be returned.");
+
+    // 5. Second call: should return the cached instance without executing the 
loader
+    HFileBlock secondResult = cache.getOrCompute(keyToCompute, loader);
+    assertEquals(1, loaderExecutionCount.get(), "Loader should NOT be called 
again for a cached key.");
+    assertSame(firstResult, secondResult, "Repeated calls should return the 
exact same cached object instance.");
+
+    // 6. Final check: ensure the pre-existing block is still accessible
+    assertSame(preExistingBlock, cache.getBlock(preExistingKey), "Pre-existing 
block should remain untouched.");
+  }
+
+  /**
+   * Creates a valid HFile block data with proper header structure for 
testing. This mimics the structure expected by HFileBlock constructor.
+   */
+  private static byte[] createValidHFileBlockData() {
+    final int headerSize = HFileBlock.HFILEBLOCK_HEADER_SIZE;
+    final int dataSize = 100;
+    final int totalSize = headerSize + dataSize;
+
+    ByteBuffer buffer = ByteBuffer.allocate(totalSize);
+
+    // Write HFile block header
+    buffer.put(HFileBlockType.DATA.getMagic()); // 8 bytes block magic
+    buffer.putInt(dataSize); // onDiskSizeWithoutHeader (4 bytes)
+    buffer.putInt(dataSize); // uncompressedSizeWithoutHeader (4 bytes) 
+    buffer.putLong(0L); // prevBlockOffset (8 bytes)
+    buffer.put(ChecksumType.CRC32C.getCode()); // checksum type (1 byte)
+    buffer.putInt(16384); // bytesPerChecksum (4 bytes) - valid non-zero value
+    buffer.putInt(totalSize); // onDiskDataSizeWithHeader (4 bytes)
+
+    // Fill with dummy data
+    for (int i = 0; i < dataSize; i++) {
+      buffer.put((byte) (i % 256));
+    }
+
+    return buffer.array();
+  }
+
+  /**
+   * Mock implementation of HFileDataBlock for testing purposes. Extends 
HFileDataBlock to provide access to protected constructor.
+   */
+  private static class MockHFileDataBlock extends HFileDataBlock {
+
+    public MockHFileDataBlock(HFileContext context, byte[] byteBuff, int 
startOffsetInBuff) {
+      super(context, byteBuff, startOffsetInBuff);
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
index d5a6cf995f1c..5a251f61f693 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
@@ -44,7 +44,8 @@ class RecordLevelIndexTestBase extends 
HoodieStatsIndexTestBase {
   def commonOpts: Map[String, String] = Map(
     PARTITIONPATH_FIELD.key -> "partition",
     HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
-    HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "15"
+    HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "15",
+    "hoodie.hfile.block.cache.enabled" -> "true"
   ) ++ baseOpts ++ metadataOpts
 
   val secondaryIndexOpts: Map[String, String] = Map(


Reply via email to