This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new db5e11c853cb [HUDI-7219] Add caching support for HFileBlocks (#13724)
db5e11c853cb is described below
commit db5e11c853cbfec82dea47ffff790c915ba7bde9
Author: voonhous <[email protected]>
AuthorDate: Tue Sep 16 21:47:24 2025 +0800
[HUDI-7219] Add caching support for HFileBlocks (#13724)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../hudi/common/config/HoodieReaderConfig.java | 28 +++
.../apache/hudi/io/storage/HFileReaderFactory.java | 29 +++
.../hudi/metadata/HoodieBackedTableMetadata.java | 30 ++-
.../TestHoodieNativeAvroHFileReaderCaching.java | 259 +++++++++++++++++++++
hudi-io/pom.xml | 5 +
.../hudi/io/hfile/CachingHFileReaderImpl.java | 145 ++++++++++++
.../org/apache/hudi/io/hfile/HFileBlockCache.java | 155 ++++++++++++
.../org/apache/hudi/io/hfile/HFileReaderImpl.java | 53 +++--
.../apache/hudi/io/hfile/TestHFileBlockCache.java | 178 ++++++++++++++
.../hudi/functional/RecordLevelIndexTestBase.scala | 3 +-
10 files changed, 855 insertions(+), 30 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
index 56935fc5d817..1ede383b0895 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
@@ -89,4 +89,32 @@ public class HoodieReaderConfig extends HoodieConfig {
"hoodie.write.record.merge.custom.implementation.classes";
public static final String
RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY =
"hoodie.datasource.write.record.merger.impls";
+
+ public static final ConfigProperty<Boolean> HFILE_BLOCK_CACHE_ENABLED =
ConfigProperty
+ .key("hoodie.hfile.block.cache.enabled")
+ .defaultValue(false)
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Enable HFile block-level caching for metadata files.
This caches frequently "
+ + "accessed HFile blocks in memory to reduce I/O operations during
metadata queries. "
+ + "Improves performance for workloads with repeated metadata access
patterns.");
+
+ public static final ConfigProperty<Integer> HFILE_BLOCK_CACHE_SIZE =
ConfigProperty
+ .key("hoodie.hfile.block.cache.size")
+ .defaultValue(100)
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Maximum number of HFile blocks to cache in memory
per metadata file reader. "
+ + "Higher values improve cache hit rates but consume more memory. "
+ + "Only effective when hfile.block.cache.enabled is true.");
+
+ public static final ConfigProperty<Integer> HFILE_BLOCK_CACHE_TTL_MINUTES =
ConfigProperty
+ .key("hoodie.hfile.block.cache.ttl.minutes")
+ .defaultValue(60)
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Time-to-live (TTL) in minutes for cached HFile
blocks. Blocks are evicted "
+ + "from the cache after this duration to prevent memory leaks. "
+ + "Only effective when hfile.block.cache.enabled is true.");
+
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HFileReaderFactory.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HFileReaderFactory.java
index 665c38c14c62..1eb84469739a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HFileReaderFactory.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HFileReaderFactory.java
@@ -20,13 +20,17 @@
package org.apache.hudi.io.storage;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.hash.MurmurHash;
import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
import org.apache.hudi.io.ByteArraySeekableDataInputStream;
import org.apache.hudi.io.SeekableDataInputStream;
+import org.apache.hudi.io.hfile.CachingHFileReaderImpl;
import org.apache.hudi.io.hfile.HFileReader;
import org.apache.hudi.io.hfile.HFileReaderImpl;
import org.apache.hudi.storage.HoodieStorage;
@@ -42,6 +46,7 @@ public class HFileReaderFactory {
private final HoodieStorage storage;
private final HoodieMetadataConfig metadataConfig;
+ private final TypedProperties properties;
private final Either<StoragePath, byte[]> fileSource;
public HFileReaderFactory(HoodieStorage storage,
@@ -49,15 +54,39 @@ public class HFileReaderFactory {
Either<StoragePath, byte[]> fileSource) {
this.storage = storage;
this.metadataConfig =
HoodieMetadataConfig.newBuilder().withProperties(properties).build();
+ this.properties = properties;
this.fileSource = fileSource;
}
public HFileReader createHFileReader() throws IOException {
final long fileSize = determineFileSize();
final SeekableDataInputStream inputStream = createInputStream(fileSize);
+
+ if (shouldEnableBlockCaching()) {
+ int blockCacheSize = ConfigUtils.getIntWithAltKeys(
+ properties, HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE);
+ int cacheTtlMinutes = ConfigUtils.getIntWithAltKeys(
+ properties, HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES);
+ String filePath = getFilePath();
+ return new CachingHFileReaderImpl(inputStream, fileSize, filePath,
blockCacheSize, cacheTtlMinutes);
+ }
+
return new HFileReaderImpl(inputStream, fileSize);
}
+ private boolean shouldEnableBlockCaching() {
+ return ConfigUtils.getBooleanWithAltKeys(properties,
HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED);
+ }
+
+ private String getFilePath() {
+ if (fileSource.isLeft()) {
+ return fileSource.asLeft().toString();
+ }
+ // For byte array content, use a hash-based identifier
+ int murmurHash = MurmurHash.getInstance().hash(fileSource.asRight());
+ return String.valueOf(murmurHash);
+ }
+
private long determineFileSize() throws IOException {
if (fileSource.isLeft()) {
return storage.getPathInfo(fileSource.asLeft()).getLength();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index e8b009e3a109..ee2b4b1a41c2 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -22,7 +22,9 @@ import org.apache.hudi.avro.HoodieAvroReaderContext;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
@@ -86,6 +88,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
@@ -506,14 +509,18 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
// If reuse is enabled and full scan is allowed for the partition, we can
reuse the file readers for base files and the reader context for the log files.
Map<StoragePath, HoodieAvroFileReader> baseFileReaders =
Collections.emptyMap();
ReusableFileGroupRecordBufferLoader<IndexedRecord> recordBufferLoader =
null;
- if (reuse && isFullScanAllowedForPartition(fileSlice.getPartitionPath())) {
+ boolean shouldReuse = reuse &&
isFullScanAllowedForPartition(fileSlice.getPartitionPath());
+ if (shouldReuse) {
Pair<HoodieAvroFileReader,
ReusableFileGroupRecordBufferLoader<IndexedRecord>> readers =
reusableFileReaders.computeIfAbsent(fileSlice.getFileGroupId(), fgId
-> {
try {
HoodieAvroFileReader baseFileReader = null;
if (fileSlice.getBaseFile().isPresent()) {
+ TypedProperties props =
TypedProperties.copy(metadataConfig.getProps());
+ setHFileBlockCacheProps(props);
+ HoodieConfig newConfig = new HoodieConfig(props);
baseFileReader = (HoodieAvroFileReader)
HoodieIOFactory.getIOFactory(storage).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
- .getFileReader(metadataConfig,
fileSlice.getBaseFile().get().getStoragePath(),
metadataMetaClient.getTableConfig().getBaseFileFormat(), Option.empty());
+ .getFileReader(newConfig,
fileSlice.getBaseFile().get().getStoragePath(),
metadataMetaClient.getTableConfig().getBaseFileFormat(), Option.empty());
}
return Pair.of(baseFileReader,
buildReusableRecordBufferLoader(fileSlice, latestMetadataInstantTime,
instantRange));
} catch (IOException ex) {
@@ -542,7 +549,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
.withFileSlice(fileSlice)
.withDataSchema(SCHEMA)
.withRequestedSchema(SCHEMA)
- .withProps(buildFileGroupReaderProperties(metadataConfig))
+ .withProps(buildFileGroupReaderProperties(metadataConfig, shouldReuse))
.withRecordBufferLoader(recordBufferLoader)
.withEnableOptimizedLogBlockScan(metadataConfig.isOptimizedLogBlocksScanEnabled())
.build();
@@ -855,7 +862,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
/**
* Derive necessary properties for FG reader.
*/
- TypedProperties buildFileGroupReaderProperties(HoodieMetadataConfig
metadataConfig) {
+ TypedProperties buildFileGroupReaderProperties(HoodieMetadataConfig
metadataConfig, boolean shouldReuse) {
HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder()
.fromProperties(metadataConfig.getProps()).build();
TypedProperties props = new TypedProperties();
@@ -871,6 +878,21 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
props.setProperty(
DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
Boolean.toString(commonConfig.isBitCaskDiskMapCompressionEnabled()));
+ if (shouldReuse) {
+ setHFileBlockCacheProps(props);
+ } else {
+ props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED.key(),
+
metadataConfig.getStringOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED));
+ }
+ props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE.key(),
+
metadataConfig.getStringOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE));
+ props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES.key(),
+
metadataConfig.getStringOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES));
return props;
}
+
+ private void setHFileBlockCacheProps(Properties props) {
+ // Enable HFile block caching for resue and full scan usage
+ props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED.key(),
"true");
+ }
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieNativeAvroHFileReaderCaching.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieNativeAvroHFileReaderCaching.java
new file mode 100644
index 000000000000..2b86245ce824
--- /dev/null
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieNativeAvroHFileReaderCaching.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.io.hadoop.HoodieAvroHFileWriter;
+import org.apache.hudi.io.hadoop.TestHoodieOrcReaderWriter;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.function.Supplier;
+
+import static
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestHoodieNativeAvroHFileReaderCaching {
+
+ @TempDir
+ public static Path tempDir;
+
+ // Fixed seed for reproducible tests
+ private static final Random RANDOM = new Random(42);
+ private static final int KEYS_TO_LOOKUP = 10_000;
+ private static final List<String> EXISTING_KEYS = new ArrayList<>();
+ private static final List<String> MISSING_KEYS = new ArrayList<>();
+ private static HoodieStorage storage;
+
+ @BeforeAll
+ public static void setup() throws Exception {
+ storage = HoodieTestUtils.getStorage(getFilePath());
+ Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class,
"/exampleSchemaWithMetaFields.avsc");
+ HoodieAvroHFileWriter writer = createWriter(avroSchema, true);
+
+ // Write records with for realistic testing
+ final int numRecords = 50_000;
+ System.out.println("Creating HFile with " + numRecords + " records...");
+
+ for (int i = 0; i < numRecords; i++) {
+ String key = String.format("key_%08d", i);
+ EXISTING_KEYS.add(key);
+
+ GenericRecord record = new GenericData.Record(avroSchema);
+ record.put("_row_key", key);
+ record.put("time", Integer.toString(RANDOM.nextInt()));
+ record.put("number", i);
+
+ writer.writeAvroWithMetadata(
+ new HoodieAvroRecord(new HoodieKey(key, "partition_" + (i % 10)),
+ new EmptyHoodieRecordPayload()).getKey(), record);
+ }
+ writer.close();
+
+ // Generate missing keys that don't exist in the HFile
+ for (int i = 0; i < KEYS_TO_LOOKUP; i++) {
+ String missingKey = String.format("missing_key_%08d", i + numRecords);
+ MISSING_KEYS.add(missingKey);
+ }
+
+ System.out.println("HFile created with " + EXISTING_KEYS.size() + "
existing keys");
+ System.out.println("Generated " + MISSING_KEYS.size() + " missing keys for
testing");
+ }
+
+ @Test
+ @Disabled("Enable this for local performance tests")
+ public void testBlockCachePerformanceOnRecordLevelIndex() throws Exception {
+ System.out.println("\n=== HFile BlockCache Performance Test ===");
+
+ // Test existing keys lookup performance
+ testExistingKeysLookup();
+
+ // Test missing keys lookup performance
+ testMissingKeysLookup();
+
+
System.out.println("================================================================\n");
+ }
+
+ private void testExistingKeysLookup() throws Exception {
+ System.out.println("\n--- Testing " + KEYS_TO_LOOKUP + " Existing Key
Lookups ---");
+
+ // Select 10K random existing keys
+ Collections.shuffle(EXISTING_KEYS, RANDOM);
+ List<String> testKeys = EXISTING_KEYS.subList(0, KEYS_TO_LOOKUP);
+ testKeys.sort(String::compareTo);
+
+ // Warm up JVM
+ warmupReads(testKeys.subList(0, 1000));
+
+ // Test without cache
+ long cacheTime = measureLookupTime(testKeys, true);
+ long noCacheTime = measureLookupTime(testKeys, false);
+
+ double speedup = (double) noCacheTime / cacheTime;
+
+ System.out.printf(KEYS_TO_LOOKUP + " Existing Key Lookups:\n");
+ System.out.printf(" - Without BlockCache: %d ms\n", noCacheTime);
+ System.out.printf(" - With BlockCache: %d ms\n", cacheTime);
+ System.out.printf(" - Speedup: %.2fx\n", speedup);
+ System.out.printf(" - Performance Improvement: %.1f%%\n", (speedup - 1) *
100);
+
+ assertTrue(speedup > 1.0, "BlockCache should provide speedup for existing
key lookups");
+ }
+
+ private void testMissingKeysLookup() throws Exception {
+ System.out.println("\n--- Testing " + KEYS_TO_LOOKUP + " Missing Key
Lookups ---");
+
+ // Use all 1k missing keys
+ List<String> testKeys = new ArrayList<>(MISSING_KEYS);
+
+ // Warm up JVM
+ warmupReads(testKeys.subList(0, 1000));
+
+ // Test without cache
+ long noCacheTime = measureLookupTime(testKeys, false, false);
+
+ // Test with cache
+ long cacheTime = measureLookupTime(testKeys, true, false);
+
+ double speedup = (double) noCacheTime / cacheTime;
+
+ System.out.printf(KEYS_TO_LOOKUP + " Missing Key Lookups:\n");
+ System.out.printf(" - Without BlockCache: %d ms\n", noCacheTime);
+ System.out.printf(" - With BlockCache: %d ms\n", cacheTime);
+ System.out.printf(" - Speedup: %.2fx\n", speedup);
+ System.out.printf(" - Performance Improvement: %.1f%%\n", (speedup - 1) *
100);
+
+ // Missing keys may not benefit as much from caching but should not be
slower
+ assertTrue(speedup >= 0.8, "BlockCache should not significantly slow down
missing key lookups");
+ }
+
+ private void warmupReads(List<String> keys) throws Exception {
+ // Warm up JVM to reduce noise in measurements
+ try (HoodieAvroHFileReaderImplBase reader = createReader(storage, true,
true)) {
+ ClosableIterator<IndexedRecord> iter =
reader.getIndexedRecordsByKeysIterator(keys, reader.getSchema());
+ // Consume all records
+ toStream(iter).forEach(record -> {
+ });
+ }
+ }
+
+ private long measureLookupTime(List<String> keys, boolean enableCache)
throws Exception {
+ return measureLookupTime(keys, enableCache, false);
+ }
+
+ private long measureLookupTime(List<String> keys, boolean enableCache,
boolean useBloomFilter) throws Exception {
+ // Force garbage collection before measurement
+ System.gc();
+ Thread.sleep(100); // Allow GC to complete
+
+ long startTime = System.nanoTime();
+ int totalRecordCount = 0;
+
+ // Look up each key individually to test realistic usage pattern
+
+ for (String key : keys) {
+ try (HoodieAvroHFileReaderImplBase reader = createReader(storage,
useBloomFilter, enableCache)) {
+ List<String> singleKey = Collections.singletonList(key);
+ ClosableIterator<IndexedRecord> iter =
reader.getIndexedRecordsByKeysIterator(singleKey, reader.getSchema());
+
+ // Count records for this key
+ while (iter.hasNext()) {
+ iter.next();
+ totalRecordCount++;
+ }
+ iter.close();
+ }
+ }
+ long endTime = System.nanoTime();
+
+ // Validate we found the expected number of records
+ boolean isExistingKeysTest = keys.size() <= EXISTING_KEYS.size()
+ && EXISTING_KEYS.containsAll(keys);
+ if (isExistingKeysTest) {
+ assertTrue(totalRecordCount > 0, "Should find existing records");
+ }
+
+ return (endTime - startTime) / 1_000_000; // Convert to milliseconds
+ }
+
+ // Helper methods
+
+ private static HoodieAvroHFileWriter createWriter(Schema avroSchema, boolean
populateMetaFields) throws Exception {
+ String instantTime = "000";
+ HoodieStorage storage = HoodieTestUtils.getStorage(getFilePath());
+ Properties props = new Properties();
+ props.setProperty(HoodieTableConfig.POPULATE_META_FIELDS.key(),
Boolean.toString(populateMetaFields));
+ TaskContextSupplier mockTaskContextSupplier =
mock(TaskContextSupplier.class);
+ Supplier<Integer> partitionSupplier = mock(Supplier.class);
+
when(mockTaskContextSupplier.getPartitionIdSupplier()).thenReturn(partitionSupplier);
+ when(partitionSupplier.get()).thenReturn(10);
+
+ return (HoodieAvroHFileWriter) HoodieFileWriterFactory.getFileWriter(
+ instantTime, getFilePath(), storage,
HoodieStorageConfig.newBuilder().fromProperties(props).build(), avroSchema,
+ mockTaskContextSupplier, HoodieRecord.HoodieRecordType.AVRO);
+ }
+
+ private static StoragePath getFilePath() {
+ return new StoragePath(tempDir.toString() + "/perf_test.hfile");
+ }
+
+ private HoodieAvroHFileReaderImplBase createReader(HoodieStorage storage,
boolean useBloomFilter, boolean enableCache) throws Exception {
+ TypedProperties props = new TypedProperties();
+ props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED.key(),
String.valueOf(enableCache));
+ // Use a cache that can hold 100 blocks
+ props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE.key(),
String.valueOf(100));
+
+ HFileReaderFactory readerFactory = HFileReaderFactory.builder()
+ .withStorage(storage)
+ .withPath(getFilePath())
+ .withProps(props)
+ .build();
+ return HoodieNativeAvroHFileReader.builder()
+
.readerFactory(readerFactory).path(getFilePath()).useBloomFilter(useBloomFilter).build();
+ }
+}
diff --git a/hudi-io/pom.xml b/hudi-io/pom.xml
index c2a08e7e22fb..5aa7aff0c97f 100644
--- a/hudi-io/pom.xml
+++ b/hudi-io/pom.xml
@@ -135,6 +135,11 @@
<artifactId>protobuf-java</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ </dependency>
+
<dependency>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
diff --git
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/CachingHFileReaderImpl.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/CachingHFileReaderImpl.java
new file mode 100644
index 000000000000..3c4d4b055f8e
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/CachingHFileReaderImpl.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.io.SeekableDataInputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * HFile reader implementation with integrated caching functionality. This
extends BaseHFileReaderImpl and overrides the block instantiation method to add
caching capabilities.
+ * <p>
+ * Uses a shared static cache across all instances to maximize cache hits when
multiple readers access the same file.
+ */
+public class CachingHFileReaderImpl extends HFileReaderImpl {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CachingHFileReaderImpl.class);
+
+ private static volatile HFileBlockCache GLOBAL_BLOCK_CACHE;
+ // Store first config values to check against cache config
+ private static volatile Integer INITIAL_CACHE_SIZE;
+ private static volatile Integer INITIAL_CACHE_TTL;
+ private static final Object CACHE_LOCK = new Object();
+
+ private final String filePath;
+
+ public CachingHFileReaderImpl(SeekableDataInputStream stream, long fileSize,
String filePath, int cacheSize, int cacheTtlMinutes) {
+ super(stream, fileSize);
+ this.filePath = filePath;
+ // Initialize global cache with provided config (ignored if already
initialized)
+ getGlobalCache(cacheSize, cacheTtlMinutes);
+ }
+
+ /**
+ * Gets or creates the global cache shared by all CachingHFileReaderImpl
instances.
+ * Thread-safe singleton pattern with double-checked locking.
+ */
+ private static HFileBlockCache getGlobalCache(int cacheSize, int
cacheTtlMinutes) {
+ if (GLOBAL_BLOCK_CACHE == null) {
+ synchronized (CACHE_LOCK) {
+ if (GLOBAL_BLOCK_CACHE == null) {
+ LOG.info("Initializing global HFileBlockCache with size: {}, TTL: {}
minutes.",
+ cacheSize, cacheTtlMinutes);
+ // Store the config used for initialization
+ INITIAL_CACHE_SIZE = cacheSize;
+ INITIAL_CACHE_TTL = cacheTtlMinutes;
+ GLOBAL_BLOCK_CACHE = new HFileBlockCache(
+ cacheSize,
+ cacheTtlMinutes,
+ TimeUnit.MINUTES);
+ } else if (!INITIAL_CACHE_SIZE.equals(cacheSize) ||
!INITIAL_CACHE_TTL.equals(cacheTtlMinutes)) {
+ // Log a warning if a different config is provided after
initialization
+ LOG.warn("HFile block cache is already initialized. The provided
configuration is being ignored. "
+ + "Existing config: [Size: {}, TTL: {} mins], Ignored
config: [Size: {}, TTL: {} mins].",
+ INITIAL_CACHE_SIZE, INITIAL_CACHE_TTL,
+ cacheSize, cacheTtlMinutes);
+ }
+ }
+ }
+ return GLOBAL_BLOCK_CACHE;
+ }
+
+ @Override
+ public HFileDataBlock instantiateHFileDataBlock(BlockIndexEntry blockToRead)
throws IOException {
+ HFileBlockCache.BlockCacheKey cacheKey = new HFileBlockCache.BlockCacheKey(
+ filePath, blockToRead.getOffset(), blockToRead.getSize());
+
+ try {
+ HFileBlock block = GLOBAL_BLOCK_CACHE.getOrCompute(cacheKey, () ->
super.instantiateHFileDataBlock(blockToRead));
+ return (HFileDataBlock) block;
+ } catch (IOException | RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException("Failed to load HFile block", e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // NOTE: Do not clear the shared cache when closing individual readers
+ // The cache is shared across all instances
+ super.close();
+ }
+
+ /**
+ * Gets current cache size from the global cache.
+ *
+ * @return number of cached blocks
+ */
+ public long getCacheSize() {
+ return GLOBAL_BLOCK_CACHE != null ? GLOBAL_BLOCK_CACHE.size() : 0;
+ }
+
+ /**
+ * Clears the global block cache.
+ */
+ public void clearCache() {
+ if (GLOBAL_BLOCK_CACHE != null) {
+ GLOBAL_BLOCK_CACHE.clear();
+ }
+ }
+
+ /**
+ * Gets cache statistics for monitoring optimization effectiveness.
+ *
+ * @return string representation of cache statistics
+ */
+ public String getCacheStats() {
+ return "HFileReader Cache Stats - Size: " + (GLOBAL_BLOCK_CACHE != null ?
GLOBAL_BLOCK_CACHE.size() : 0);
+ }
+
+ /**
+ * Clears the global cache. Should only be used for testing.
+ */
+ public static void resetGlobalCache() {
+ synchronized (CACHE_LOCK) {
+ if (GLOBAL_BLOCK_CACHE != null) {
+ GLOBAL_BLOCK_CACHE.clear();
+ GLOBAL_BLOCK_CACHE = null;
+ }
+ INITIAL_CACHE_SIZE = null;
+ INITIAL_CACHE_TTL = null;
+ }
+ }
+}
diff --git
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockCache.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockCache.java
new file mode 100644
index 000000000000..512358da7487
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockCache.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Least Frequently Used (LFU) cache for HFile blocks to improve read
performance by avoiding repeated block reads.
+ * Uses Caffeine cache with configurable size and TTL. Thread-safe for
concurrent access.
+ */
+public class HFileBlockCache {
+
+ private final Cache<BlockCacheKey, HFileBlock> cache;
+
+ public HFileBlockCache(int maxCacheSize, long expireAfterWrite, TimeUnit
timeUnit) {
+ this.cache = Caffeine.newBuilder()
+ .maximumSize(maxCacheSize)
+
.expireAfterAccess(Duration.ofMillis(timeUnit.toMillis(expireAfterWrite)))
+ .build();
+ }
+
+ /**
+ * Gets a block from cache.
+ *
+ * @param key the cache key
+ * @return cached block or null if not found
+ */
+ public HFileBlock getBlock(BlockCacheKey key) {
+ return cache.getIfPresent(key);
+ }
+
+ /**
+ * Puts a block into cache.
+ *
+ * @param key the cache key
+ * @param block the block to cache
+ */
+ public void putBlock(BlockCacheKey key, HFileBlock block) {
+ cache.put(key, block);
+ }
+
+ /**
+ * Gets a block from cache, or computes and caches it if not present.
+ *
+ * @param key the cache key
+ * @param loader callable to load the block if not in cache
+ * @return cached or newly computed block
+ * @throws Exception if the loader throws an exception
+ */
+ public HFileBlock getOrCompute(BlockCacheKey key, Callable<HFileBlock>
loader) throws Exception {
+ // Caffeine uses Function instead of Callable, so we need to wrap the
Callable
+ return cache.get(key, (k) -> {
+ try {
+ return loader.call();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ /**
+ * Clears all cached blocks.
+ */
+ public void clear() {
+ cache.invalidateAll();
+ }
+
+ /**
+ * Gets current cache size.
+ *
+ * @return number of cached blocks
+ */
+ public long size() {
+ return cache.estimatedSize();
+ }
+
+ /**
+ * Forces cache maintenance operations like eviction.
+ * This is useful for testing to ensure consistent behavior.
+ */
+ public void cleanUp() {
+ cache.cleanUp();
+ }
+
+ /**
+ * Cache key for identifying blocks uniquely.
+ */
+ public static class BlockCacheKey {
+
+ private final String fileIdentity;
+ private final long offset;
+ private final int size;
+
+ public BlockCacheKey(String fileIdentity, long offset, int size) {
+ this.fileIdentity = fileIdentity;
+ this.offset = offset;
+ this.size = size;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ BlockCacheKey that = (BlockCacheKey) o;
+ return offset == that.offset
+ && size == that.size
+ && Objects.equals(fileIdentity, that.fileIdentity);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = fileIdentity != null ? fileIdentity.hashCode() : 0;
+ result = 31 * result + (int) (offset ^ (offset >>> 32));
+ result = 31 * result + size;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "BlockCacheKey{"
+ + "fileIdentity='" + fileIdentity + '\''
+ + ", offset=" + offset
+ + ", size=" + size
+ + '}';
+ }
+ }
+}
diff --git
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java
index b3be1e7d6db9..5e785bff51eb 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java
@@ -39,21 +39,22 @@ import static
org.apache.hudi.io.hfile.HFileBlock.HFILEBLOCK_HEADER_SIZE;
import static org.apache.hudi.io.hfile.HFileUtils.readMajorVersion;
/**
- * An implementation a {@link HFileReader}.
+ * Base implementation of {@link HFileReader} without caching. This provides
the core functionality for reading HFile format data.
*/
public class HFileReaderImpl implements HFileReader {
- private final SeekableDataInputStream stream;
- private final long fileSize;
-
- private final HFileCursor cursor;
- private boolean isMetadataInitialized = false;
- private HFileTrailer trailer;
- private HFileContext context;
- private TreeMap<Key, BlockIndexEntry> dataBlockIndexEntryMap;
- private TreeMap<Key, BlockIndexEntry> metaBlockIndexEntryMap;
- private HFileInfo fileInfo;
- private Option<BlockIndexEntry> currentDataBlockEntry;
- private Option<HFileDataBlock> currentDataBlock;
+
+ protected final SeekableDataInputStream stream;
+ protected final long fileSize;
+
+ protected final HFileCursor cursor;
+ protected boolean isMetadataInitialized = false;
+ protected HFileTrailer trailer;
+ protected HFileContext context;
+ protected TreeMap<Key, BlockIndexEntry> dataBlockIndexEntryMap;
+ protected TreeMap<Key, BlockIndexEntry> metaBlockIndexEntryMap;
+ protected HFileInfo fileInfo;
+ protected Option<BlockIndexEntry> currentDataBlockEntry;
+ protected Option<HFileDataBlock> currentDataBlock;
public HFileReaderImpl(SeekableDataInputStream stream, long fileSize) {
this.stream = stream;
@@ -149,7 +150,7 @@ public class HFileReaderImpl implements HFileReader {
}
}
if (!currentDataBlockEntry.get().getNextBlockFirstKey().isPresent()) {
- // This is the last data block. Check against the last key.
+ // This is the last data block. Check against the last key.
if (fileInfo.getLastKey().isPresent()) {
int comparedLastKey = key.compareTo(fileInfo.getLastKey().get());
if (comparedLastKey > 0) {
@@ -255,7 +256,7 @@ public class HFileReaderImpl implements HFileReader {
public boolean isSeeked() {
return cursor.isSeeked();
}
-
+
@Override
public void close() throws IOException {
currentDataBlockEntry = Option.empty();
@@ -311,7 +312,14 @@ public class HFileReaderImpl implements HFileReader {
return Option.of(keyBlockIndexEntryEntry.getValue());
}
- private HFileDataBlock instantiateHFileDataBlock(BlockIndexEntry
blockToRead) throws IOException {
+ /**
+ * Creates an HFile data block.
+ *
+ * @param blockToRead the block index entry to read
+ * @return the instantiated HFile data block
+ * @throws IOException if there's an error reading the block
+ */
+ public HFileDataBlock instantiateHFileDataBlock(BlockIndexEntry blockToRead)
throws IOException {
HFileBlockReader blockReader = new HFileBlockReader(
context, stream, blockToRead.getOffset(),
blockToRead.getOffset() + (long) blockToRead.getSize());
@@ -326,19 +334,14 @@ public class HFileReaderImpl implements HFileReader {
}
/**
- * Read single-level or multiple-level data block index, and load all data
block
- * information into memory in BFS fashion.
+ * Read single-level or multiple-level data block index, and load all data
block information into memory in BFS fashion.
*
- * @param rootBlockReader a {@link HFileBlockReader} used to read root data
index block;
- * this reader will be used to read subsequent meta
index block
- * afterward
+ * @param rootBlockReader a {@link HFileBlockReader} used to read root data
index block; this reader will be used to read subsequent meta index block
afterward
* @param numEntries the number of entries in the root index block
* @param levels the level of the indexes
- * @return
+ * @return single/multiple-level data block index
*/
- private TreeMap<Key, BlockIndexEntry> readDataBlockIndex(HFileBlockReader
rootBlockReader,
- int numEntries,
- int levels) throws
IOException {
+ private TreeMap<Key, BlockIndexEntry> readDataBlockIndex(HFileBlockReader
rootBlockReader, int numEntries, int levels) throws IOException {
ValidationUtils.checkArgument(levels > 0,
"levels of data block index must be greater than 0");
// Parse root data index block
diff --git
a/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileBlockCache.java
b/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileBlockCache.java
new file mode 100644
index 000000000000..6c9c28777184
--- /dev/null
+++ b/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileBlockCache.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for HFile block caching functionality.
+ */
+public class TestHFileBlockCache {
+
+ @Test
+ public void testBlockCacheBasicOperations() {
+ HFileBlockCache cache = new HFileBlockCache(2, 30, TimeUnit.MINUTES);
+ assertEquals(0, cache.size());
+
+ // Test cache key
+ HFileBlockCache.BlockCacheKey key1 = new
HFileBlockCache.BlockCacheKey(null, 100, 64);
+ HFileBlockCache.BlockCacheKey key2 = new
HFileBlockCache.BlockCacheKey(null, 200, 64);
+ HFileBlockCache.BlockCacheKey key3 = new
HFileBlockCache.BlockCacheKey(null, 300, 64);
+
+ assertNotEquals(key1, key2);
+ assertEquals(new HFileBlockCache.BlockCacheKey(null,100, 64), key1);
+
+ // Create test blocks using mock implementation with valid HFile block data
+ HFileContext context = HFileContext.builder()
+ .checksumType(ChecksumType.CRC32C)
+ .blockSize(1024)
+ .build();
+
+ // Create valid HFile block data with proper header
+ byte[] validBlockData = createValidHFileBlockData();
+ MockHFileDataBlock block1 = new MockHFileDataBlock(context,
validBlockData, 0);
+ MockHFileDataBlock block2 = new MockHFileDataBlock(context,
validBlockData, 0);
+ MockHFileDataBlock block3 = new MockHFileDataBlock(context,
validBlockData, 0);
+
+ // Test put and get
+ cache.putBlock(key1, block1);
+ assertEquals(1, cache.size());
+ assertEquals(block1, cache.getBlock(key1));
+ assertNull(cache.getBlock(key2));
+
+ // Test cache limit (LFU eviction)
+ cache.putBlock(key2, block2);
+ assertEquals(2, cache.size());
+
+ cache.putBlock(key3, block3);
+
+ // Caffeine has lazy cleanup, force cache cleanup to ensure eviction has
happened
+ cache.cleanUp();
+
+ // Test that key1 was evicted (least frequently used)
+ HFileBlock result1 = cache.getBlock(key1);
+ HFileBlock result2 = cache.getBlock(key2);
+ HFileBlock result3 = cache.getBlock(key3);
+
+ // key1 should be evicted (LFU) - it was the least frequently used
+ assertNull(result1);
+ assertSame(block2, result2);
+ assertSame(block3, result3);
+
+ // Verify final cache state - should contain at most 2 items
+ assertTrue(cache.size() <= 2, "Final cache size should not exceed maximum:
" + cache.size());
+
+ // Test clear
+ cache.clear();
+ assertEquals(0, cache.size());
+ assertNull(cache.getBlock(key2));
+ assertNull(cache.getBlock(key3));
+ }
+
+ @Test
+ public void testGetOrComputeWithMissAndMultipleBlocks() throws Exception {
+ HFileBlockCache cache = new HFileBlockCache(10, 30, TimeUnit.MINUTES);
+ AtomicInteger loaderExecutionCount = new AtomicInteger(0);
+
+ // 0. Define keys and blocks for the test
+ HFileBlockCache.BlockCacheKey keyToCompute = new
HFileBlockCache.BlockCacheKey("file-A", 1024, 128);
+ HFileBlockCache.BlockCacheKey preExistingKey = new
HFileBlockCache.BlockCacheKey("file-B", 2048, 256);
+
+ HFileContext context = HFileContext.builder().build();
+ byte[] validBlockData = createValidHFileBlockData();
+ MockHFileDataBlock blockToCompute = new MockHFileDataBlock(context,
validBlockData, 0);
+ MockHFileDataBlock preExistingBlock = new MockHFileDataBlock(context,
validBlockData, 0);
+
+ // 1. Add a pre-existing block to ensure the cache is not empty
+ cache.putBlock(preExistingKey, preExistingBlock);
+ assertEquals(1, cache.size());
+
+ // 2. Verify a cache miss for the key we are about to compute
+ assertNull(cache.getBlock(keyToCompute), "Key should not be in the cache
initially.");
+
+ // 3. Define the loader which increments a counter on execution
+ Callable<HFileBlock> loader = () -> {
+ loaderExecutionCount.incrementAndGet();
+ return blockToCompute;
+ };
+
+ // 4. First call to getOrCompute: should execute the loader
+ HFileBlock firstResult = cache.getOrCompute(keyToCompute, loader);
+ assertEquals(1, loaderExecutionCount.get(), "Loader should be called once
on first access.");
+ assertEquals(2, cache.size(), "Cache size should be 2 after computing the
new block.");
+ assertSame(blockToCompute, firstResult, "The newly computed block should
be returned.");
+
+ // 5. Second call: should return the cached instance without executing the
loader
+ HFileBlock secondResult = cache.getOrCompute(keyToCompute, loader);
+ assertEquals(1, loaderExecutionCount.get(), "Loader should NOT be called
again for a cached key.");
+ assertSame(firstResult, secondResult, "Repeated calls should return the
exact same cached object instance.");
+
+ // 6. Final check: ensure the pre-existing block is still accessible
+ assertSame(preExistingBlock, cache.getBlock(preExistingKey), "Pre-existing
block should remain untouched.");
+ }
+
+ /**
+ * Creates a valid HFile block data with proper header structure for
testing. This mimics the structure expected by HFileBlock constructor.
+ */
+ private static byte[] createValidHFileBlockData() {
+ final int headerSize = HFileBlock.HFILEBLOCK_HEADER_SIZE;
+ final int dataSize = 100;
+ final int totalSize = headerSize + dataSize;
+
+ ByteBuffer buffer = ByteBuffer.allocate(totalSize);
+
+ // Write HFile block header
+ buffer.put(HFileBlockType.DATA.getMagic()); // 8 bytes block magic
+ buffer.putInt(dataSize); // onDiskSizeWithoutHeader (4 bytes)
+ buffer.putInt(dataSize); // uncompressedSizeWithoutHeader (4 bytes)
+ buffer.putLong(0L); // prevBlockOffset (8 bytes)
+ buffer.put(ChecksumType.CRC32C.getCode()); // checksum type (1 byte)
+ buffer.putInt(16384); // bytesPerChecksum (4 bytes) - valid non-zero value
+ buffer.putInt(totalSize); // onDiskDataSizeWithHeader (4 bytes)
+
+ // Fill with dummy data
+ for (int i = 0; i < dataSize; i++) {
+ buffer.put((byte) (i % 256));
+ }
+
+ return buffer.array();
+ }
+
+ /**
+ * Mock implementation of HFileDataBlock for testing purposes. Extends
HFileDataBlock to provide access to protected constructor.
+ */
+ private static class MockHFileDataBlock extends HFileDataBlock {
+
+ public MockHFileDataBlock(HFileContext context, byte[] byteBuff, int
startOffsetInBuff) {
+ super(context, byteBuff, startOffsetInBuff);
+ }
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
index d5a6cf995f1c..5a251f61f693 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
@@ -44,7 +44,8 @@ class RecordLevelIndexTestBase extends
HoodieStatsIndexTestBase {
def commonOpts: Map[String, String] = Map(
PARTITIONPATH_FIELD.key -> "partition",
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
- HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "15"
+ HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "15",
+ "hoodie.hfile.block.cache.enabled" -> "true"
) ++ baseOpts ++ metadataOpts
val secondaryIndexOpts: Map[String, String] = Map(