This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 2cf7f8822134ce7e15acf1434b9d13075a15fd05
Author: Jon Vexler <[email protected]>
AuthorDate: Wed May 15 06:50:00 2024 -0700

    [HUDI-7744] Introduce IOFactory and a config to set the factory (#11192)
    
    Co-authored-by: Jonathan Vexler <=>
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../org/apache/hudi/index/HoodieIndexUtils.java    |  4 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  5 +-
 .../java/org/apache/hudi/io/HoodieReadHandle.java  |  6 +-
 .../table/action/commit/HoodieMergeHelper.java     |  9 +--
 .../GenericRecordValidationTestUtils.java          |  7 ++-
 .../run/strategy/JavaExecutionStrategy.java        |  6 +-
 .../hudi/client/TestJavaHoodieBackedMetadata.java  |  6 +-
 .../MultipleSparkJobExecutionStrategy.java         |  8 +--
 .../strategy/SingleSparkJobExecutionStrategy.java  |  5 +-
 .../hudi/io/storage/HoodieSparkIOFactory.java      | 49 ++++++++++++++++
 .../bootstrap/ParquetBootstrapMetadataHandler.java |  4 +-
 .../functional/TestHoodieBackedMetadata.java       | 10 ++--
 .../functional/TestHoodieBackedTableMetadata.java  |  4 +-
 .../hudi/common/config/HoodieStorageConfig.java    |  8 +++
 .../table/log/block/HoodieHFileDataBlock.java      | 18 +++---
 .../table/log/block/HoodieParquetDataBlock.java    |  4 +-
 .../table/timeline/HoodieArchivedTimeline.java     |  2 +
 .../hudi/io/storage/HoodieFileReaderFactory.java   | 27 ---------
 .../hudi/io/storage/HoodieFileWriterFactory.java   | 28 +--------
 .../apache/hudi/io/storage/HoodieIOFactory.java    | 51 ++++++++++++++++
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  4 +-
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 14 ++---
 .../hudi/sink/clustering/ClusteringOperator.java   |  7 ++-
 .../org/apache/hudi/common/util/HFileUtils.java    |  5 +-
 .../hudi/io/storage/HoodieHadoopIOFactory.java     | 68 ++++++++++++++++++++++
 .../io/hadoop/TestHoodieAvroFileReaderFactory.java |  8 ++-
 .../hudi/io/hadoop/TestHoodieOrcReaderWriter.java  |  4 +-
 .../hudi/hadoop/HoodieHFileRecordReader.java       |  8 ++-
 .../utils/HoodieRealtimeRecordReaderUtils.java     |  8 ++-
 .../reader/DFSHoodieDatasetInputReader.java        |  5 +-
 .../main/scala/org/apache/hudi/DefaultSource.scala |  6 +-
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |  4 +-
 .../utilities/HoodieMetadataTableValidator.java    |  4 +-
 33 files changed, 276 insertions(+), 130 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 808bfdfa863..db32112750a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -48,6 +48,7 @@ import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.io.HoodieMergedReadHandle;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
 import org.apache.hudi.storage.StorageConfiguration;
@@ -185,7 +186,8 @@ public class HoodieIndexUtils {
                                                 StorageConfiguration<?> 
configuration) throws HoodieIndexException {
     ValidationUtils.checkArgument(FSUtils.isBaseFile(filePath));
     List<String> foundRecordKeys = new ArrayList<>();
-    try (HoodieFileReader fileReader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+    try (HoodieFileReader fileReader = 
HoodieIOFactory.getIOFactory(configuration)
+        .getReaderFactory(HoodieRecordType.AVRO)
         .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, configuration, 
filePath)) {
       // Load all rowKeys from the file, to double-confirm
       if (!candidateRecordKeys.isEmpty()) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index ed18a2f0055..3c3a820ab09 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -44,9 +44,9 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
@@ -462,7 +462,8 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
     }
 
     long oldNumWrites = 0;
-    try (HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(this.recordMerger.getRecordType())
+    try (HoodieFileReader reader = 
HoodieIOFactory.getIOFactory(storage.getConf())
+        .getReaderFactory(this.recordMerger.getRecordType())
         .getFileReader(config, hoodieTable.getStorageConf(), oldFilePath)) {
       oldNumWrites = reader.getTotalRecords();
     } catch (IOException e) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
index 5f9afc1bad1..01678b68e96 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
@@ -23,7 +23,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.table.HoodieTable;
 
@@ -69,12 +69,12 @@ public abstract class HoodieReadHandle<T, I, K, O> extends 
HoodieIOHandle<T, I,
   }
 
   protected HoodieFileReader createNewFileReader() throws IOException {
-    return 
HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType())
+    return 
HoodieIOFactory.getIOFactory(storage.getConf()).getReaderFactory(this.config.getRecordMerger().getRecordType())
         .getFileReader(config, hoodieTable.getStorageConf(), 
getLatestBaseFile().getStoragePath());
   }
 
   protected HoodieFileReader createNewFileReader(HoodieBaseFile 
hoodieBaseFile) throws IOException {
-    return 
HoodieFileReaderFactory.getReaderFactory(this.config.getRecordMerger().getRecordType())
+    return 
HoodieIOFactory.getIOFactory(storage.getConf()).getReaderFactory(this.config.getRecordMerger().getRecordType())
         .getFileReader(config, hoodieTable.getStorageConf(), 
hoodieBaseFile.getStoragePath());
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index 38383fd7a88..a13253bc1b0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -37,7 +37,7 @@ import 
org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
 import org.apache.hudi.internal.schema.utils.SerDeHelper;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
@@ -80,7 +80,7 @@ public class HoodieMergeHelper<T> extends BaseMergeHelper {
 
     StorageConfiguration<?> storageConf = table.getStorageConf().newInstance();
     HoodieRecord.HoodieRecordType recordType = 
table.getConfig().getRecordMerger().getRecordType();
-    HoodieFileReader baseFileReader = HoodieFileReaderFactory
+    HoodieFileReader baseFileReader = HoodieIOFactory.getIOFactory(storageConf)
         .getReaderFactory(recordType)
         .getFileReader(writeConfig, storageConf, mergeHandle.getOldFilePath());
     HoodieFileReader bootstrapFileReader = null;
@@ -112,9 +112,10 @@ public class HoodieMergeHelper<T> extends BaseMergeHelper {
       if (baseFile.getBootstrapBaseFile().isPresent()) {
         StoragePath bootstrapFilePath = 
baseFile.getBootstrapBaseFile().get().getStoragePath();
         StorageConfiguration<?> bootstrapFileConfig = 
table.getStorageConf().newInstance();
-        bootstrapFileReader = 
HoodieFileReaderFactory.getReaderFactory(recordType).newBootstrapFileReader(
+        bootstrapFileReader = 
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(recordType).newBootstrapFileReader(
             baseFileReader,
-            
HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(writeConfig, 
bootstrapFileConfig, bootstrapFilePath),
+            
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(recordType)
+                .getFileReader(writeConfig, bootstrapFileConfig, 
bootstrapFilePath),
             mergeHandle.getPartitionFields(),
             mergeHandle.getPartitionValues());
         recordSchema = mergeHandle.getWriterSchemaWithMetaFields();
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
index 4a342cbcec2..34972f01832 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
@@ -30,7 +30,7 @@ import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
 import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
@@ -145,9 +145,10 @@ public class GenericRecordValidationTestUtils {
   public static Stream<GenericRecord> readHFile(Configuration conf, String[] 
paths) {
     List<GenericRecord> valuesAsList = new LinkedList<>();
     for (String path : paths) {
+      StorageConfiguration storageConf = HadoopFSUtils.getStorageConf(conf);
       try (HoodieAvroHFileReaderImplBase reader = 
(HoodieAvroHFileReaderImplBase)
-          
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
-              .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, 
HadoopFSUtils.getStorageConf(conf), new StoragePath(path), 
HoodieFileFormat.HFILE)) {
+          
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+              .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, storageConf, new 
StoragePath(path), HoodieFileFormat.HFILE)) {
         
valuesAsList.addAll(HoodieAvroHFileReaderImplBase.readAllRecords(reader)
             .stream().map(e -> (GenericRecord) 
e).collect(Collectors.toList()));
       } catch (IOException e) {
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index 02021dcc405..5b216807932 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -43,7 +43,7 @@ import 
org.apache.hudi.execution.bulkinsert.JavaBulkInsertInternalPartitionerFac
 import org.apache.hudi.execution.bulkinsert.JavaCustomColumnsSortPartitioner;
 import org.apache.hudi.io.IOUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieTable;
@@ -192,7 +192,7 @@ public abstract class JavaExecutionStrategy<T>
 
         baseFileReader = 
StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
             ? Option.empty()
-            : Option.of(HoodieFileReaderFactory.getReaderFactory(recordType)
+            : 
Option.of(HoodieIOFactory.getIOFactory(table.getStorageConf()).getReaderFactory(recordType)
             .getFileReader(config, table.getStorageConf(), new 
StoragePath(clusteringOp.getDataFilePath())));
         HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
         Iterator<HoodieRecord<T>> fileSliceReader = new 
HoodieFileSliceReader(baseFileReader, scanner, readerSchema, 
tableConfig.getPreCombineField(), writeConfig.getRecordMerger(),
@@ -221,7 +221,7 @@ public abstract class JavaExecutionStrategy<T>
   private List<HoodieRecord<T>> 
readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
     List<HoodieRecord<T>> records = new ArrayList<>();
     clusteringOps.forEach(clusteringOp -> {
-      try (HoodieFileReader baseFileReader = 
HoodieFileReaderFactory.getReaderFactory(recordType)
+      try (HoodieFileReader baseFileReader = 
HoodieIOFactory.getIOFactory(getHoodieTable().getStorageConf()).getReaderFactory(recordType)
           .getFileReader(getHoodieTable().getConfig(), 
getHoodieTable().getStorageConf(), new 
StoragePath(clusteringOp.getDataFilePath()))) {
         Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(getWriteConfig().getSchema()));
         Iterator<HoodieRecord> recordIterator = 
baseFileReader.getRecordIterator(readerSchema);
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index 8e62d640530..c2413133477 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -85,7 +85,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
@@ -544,7 +544,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     List<FileSlice> fileSlices = 
table.getSliceView().getLatestFileSlices("files").collect(Collectors.toList());
     HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
     HoodieAvroHFileReaderImplBase hoodieHFileReader = 
(HoodieAvroHFileReaderImplBase)
-        
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
+        
HoodieIOFactory.getIOFactory(context.getStorageConf()).getReaderFactory(HoodieRecordType.AVRO).getFileReader(
             writeConfig, context.getStorageConf(), new 
StoragePath(baseFile.getPath()));
     List<IndexedRecord> records = 
HoodieAvroHFileReaderImplBase.readAllRecords(hoodieHFileReader);
     records.forEach(entry -> {
@@ -971,7 +971,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     final HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
 
     HoodieAvroHFileReaderImplBase hoodieHFileReader = 
(HoodieAvroHFileReaderImplBase)
-        
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
+        
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecordType.AVRO).getFileReader(
             table.getConfig(), context.getStorageConf(), new 
StoragePath(baseFile.getPath()));
     List<IndexedRecord> records = 
HoodieAvroHFileReaderImplBase.readAllRecords(hoodieHFileReader);
     records.forEach(entry -> {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index ea1ae05e2b0..fe1e6710673 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -55,7 +55,6 @@ import 
org.apache.hudi.execution.bulkinsert.RowCustomColumnsSortPartitioner;
 import org.apache.hudi.execution.bulkinsert.RowSpatialCurveSortPartitioner;
 import org.apache.hudi.io.IOUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.storage.StorageConfiguration;
@@ -93,6 +92,7 @@ import java.util.stream.Stream;
 import static 
org.apache.hudi.client.utils.SparkPartitionUtils.getPartitionFieldVals;
 import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF;
 import static 
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
+import static 
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
 
 /**
  * Clustering strategy to submit multiple spark jobs and union the results.
@@ -380,7 +380,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
 
   private HoodieFileReader 
getBaseOrBootstrapFileReader(StorageConfiguration<?> storageConf, String 
bootstrapBasePath, Option<String[]> partitionFields, ClusteringOperation 
clusteringOp)
       throws IOException {
-    HoodieFileReader baseFileReader = 
HoodieFileReaderFactory.getReaderFactory(recordType)
+    HoodieFileReader baseFileReader = 
getHoodieSparkIOFactory().getReaderFactory(recordType)
         .getFileReader(writeConfig, storageConf, new 
StoragePath(clusteringOp.getDataFilePath()));
     // handle bootstrap path
     if (StringUtils.nonEmpty(clusteringOp.getBootstrapFilePath()) && 
StringUtils.nonEmpty(bootstrapBasePath)) {
@@ -392,9 +392,9 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
         partitionValues = getPartitionFieldVals(partitionFields, 
partitionFilePath, bootstrapBasePath, baseFileReader.getSchema(),
             storageConf.unwrapAs(Configuration.class));
       }
-      baseFileReader = 
HoodieFileReaderFactory.getReaderFactory(recordType).newBootstrapFileReader(
+      baseFileReader = 
getHoodieSparkIOFactory().getReaderFactory(recordType).newBootstrapFileReader(
           baseFileReader,
-          HoodieFileReaderFactory.getReaderFactory(recordType).getFileReader(
+          getHoodieSparkIOFactory().getReaderFactory(recordType).getFileReader(
               writeConfig, storageConf, new StoragePath(bootstrapFilePath)), 
partitionFields,
           partitionValues);
     }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
index 50eb9d4bd7a..06ba64dad89 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
@@ -39,7 +39,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.storage.StoragePath;
@@ -64,6 +63,8 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
+import static 
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
+
 /**
  * Clustering strategy to submit single spark jobs.
  * MultipleSparkJobExecution strategy is not ideal for use cases that require 
large number of clustering groups
@@ -146,7 +147,7 @@ public abstract class SingleSparkJobExecutionStrategy<T>
       Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(getWriteConfig().getSchema()));
       Iterable<HoodieRecord<T>> indexedRecords = () -> {
         try {
-          HoodieFileReader baseFileReader = 
HoodieFileReaderFactory.getReaderFactory(recordType)
+          HoodieFileReader baseFileReader = 
getHoodieSparkIOFactory().getReaderFactory(recordType)
               .getFileReader(writeConfig, getHoodieTable().getStorageConf(), 
new StoragePath(clusteringOp.getDataFilePath()));
           Option<BaseKeyGenerator> keyGeneratorOp = 
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(writeConfig);
           // NOTE: Record have to be cloned here to make sure if it holds 
low-level engine-specific
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkIOFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkIOFactory.java
new file mode 100644
index 00000000000..16431d61551
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkIOFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.model.HoodieRecord;
+
+/**
+ * Creates readers and writers for SPARK and AVRO record payloads
+ */
+public class HoodieSparkIOFactory extends HoodieHadoopIOFactory {
+  private static final HoodieSparkIOFactory HOODIE_SPARK_IO_FACTORY = new 
HoodieSparkIOFactory();
+
+  public static HoodieSparkIOFactory getHoodieSparkIOFactory() {
+    return HOODIE_SPARK_IO_FACTORY;
+  }
+
+  @Override
+  public HoodieFileReaderFactory 
getReaderFactory(HoodieRecord.HoodieRecordType recordType) {
+    if (recordType == HoodieRecord.HoodieRecordType.SPARK) {
+      return new HoodieSparkFileReaderFactory();
+    }
+    return super.getReaderFactory(recordType);
+  }
+
+  @Override
+  public HoodieFileWriterFactory 
getWriterFactory(HoodieRecord.HoodieRecordType recordType) {
+    if (recordType == HoodieRecord.HoodieRecordType.SPARK) {
+      return new HoodieSparkFileWriterFactory();
+    }
+    return super.getWriterFactory(recordType);
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
index 151e88432e3..adc6a456ac9 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
@@ -31,7 +31,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.HoodieBootstrapHandle;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.keygen.KeyGeneratorInterface;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
@@ -58,6 +57,7 @@ import java.io.IOException;
 import java.util.function.Function;
 
 import static 
org.apache.hudi.io.HoodieBootstrapHandle.METADATA_BOOTSTRAP_RECORD_SCHEMA;
+import static 
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
 
 class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
 
@@ -82,7 +82,7 @@ class ParquetBootstrapMetadataHandler extends 
BaseBootstrapMetadataHandler {
                                   Schema schema) throws Exception {
     HoodieRecord.HoodieRecordType recordType = 
table.getConfig().getRecordMerger().getRecordType();
 
-    HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(recordType)
+    HoodieFileReader reader = 
getHoodieSparkIOFactory().getReaderFactory(recordType)
         .getFileReader(table.getConfig(), table.getStorageConf(), 
sourceFilePath);
 
     HoodieExecutor<Void> executor = null;
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 9301529c740..a83fcd4bf27 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -88,7 +88,6 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
@@ -172,6 +171,7 @@ import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAM
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.getNextCommitTime;
 import static 
org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS;
 import static 
org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.METADATA_COMPACTION_TIME_SUFFIX;
+import static 
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
@@ -821,7 +821,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     List<FileSlice> fileSlices = 
table.getSliceView().getLatestFileSlices("files").collect(Collectors.toList());
     HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
     HoodieAvroHFileReaderImplBase hoodieHFileReader = 
(HoodieAvroHFileReaderImplBase)
-        
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
+        
getHoodieSparkIOFactory().getReaderFactory(HoodieRecordType.AVRO).getFileReader(
             table.getConfig(), context.getStorageConf(), new 
StoragePath(baseFile.getPath()));
     List<IndexedRecord> records = 
HoodieAvroHFileReaderImplBase.readAllRecords(hoodieHFileReader);
     records.forEach(entry -> {
@@ -1354,9 +1354,9 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     }
     final HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
 
-    HoodieAvroHFileReaderImplBase hoodieHFileReader = 
(HoodieAvroHFileReaderImplBase)
-        
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
-            table.getConfig(), context.getStorageConf(), new 
StoragePath(baseFile.getPath()));
+    HoodieAvroHFileReaderImplBase hoodieHFileReader = 
(HoodieAvroHFileReaderImplBase) getHoodieSparkIOFactory()
+        .getReaderFactory(HoodieRecordType.AVRO)
+        .getFileReader(table.getConfig(), context.getStorageConf(), new 
StoragePath(baseFile.getPath()));
     List<IndexedRecord> records = 
HoodieAvroHFileReaderImplBase.readAllRecords(hoodieHFileReader);
     records.forEach(entry -> {
       if (enableMetaFields) {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 3310dda5633..c4a79f1ea71 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -46,7 +46,6 @@ import 
org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.metadata.HoodieBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
 import org.apache.hudi.metadata.HoodieMetadataPayload;
@@ -87,6 +86,7 @@ import static 
org.apache.hudi.common.model.WriteOperationType.COMPACT;
 import static org.apache.hudi.common.model.WriteOperationType.INSERT;
 import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
+import static 
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
 import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -527,7 +527,7 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
     final HoodieBaseFile baseFile = fileSlices.get(0).getBaseFile().get();
 
     HoodieAvroHFileReaderImplBase hoodieHFileReader = 
(HoodieAvroHFileReaderImplBase)
-        
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
+        
getHoodieSparkIOFactory().getReaderFactory(HoodieRecordType.AVRO).getFileReader(
             table.getConfig(), context.getStorageConf(), new 
StoragePath(baseFile.getPath()));
     List<IndexedRecord> records = 
HoodieAvroHFileReaderImplBase.readAllRecords(hoodieHFileReader);
     records.forEach(entry -> {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
index f3ad183def4..0309aee00a9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
@@ -235,6 +235,14 @@ public class HoodieStorageConfig extends HoodieConfig {
           + "and it is loaded at runtime. This is only required when trying to 
"
           + "override the existing write context when 
`hoodie.datasource.write.row.writer.enable=true`.");
 
+  public static final ConfigProperty<String> HOODIE_IO_FACTORY_CLASS = 
ConfigProperty
+      .key("hoodie.io.factory.class")
+      .defaultValue("org.apache.hudi.io.storage.HoodieHadoopIOFactory")
+      .markAdvanced()
+      .sinceVersion("0.15.0")
+      .withDocumentation("The fully-qualified class name of the factory class 
to return readers and writers of files used "
+          + "by Hudi. The provided class should implement 
`org.apache.hudi.io.storage.HoodieIOFactory`.");
+
 
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 219fa2dc1c7..f3b79e05787 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -33,7 +33,7 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.SeekableDataInputStream;
 import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
@@ -192,11 +192,10 @@ public class HoodieHFileDataBlock extends HoodieDataBlock 
{
     StorageConfiguration<?> storageConf = 
getBlockContentLocation().get().getStorageConf().getInline();
     HoodieStorage storage = HoodieStorageUtils.getStorage(pathForReader, 
storageConf);
     // Read the content
-    try (HoodieFileReader reader =
-             
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getContentReader(
-
-                 hFileReaderConfig, storageConf, pathForReader, 
HoodieFileFormat.HFILE, storage, content,
-                 Option.of(getSchemaFromHeader()))) {
+    try (HoodieFileReader reader = HoodieIOFactory.getIOFactory(storageConf)
+        .getReaderFactory(HoodieRecordType.AVRO)
+        .getContentReader(hFileReaderConfig, storageConf, pathForReader,
+            HoodieFileFormat.HFILE, storage, content, 
Option.of(getSchemaFromHeader()))) {
       return unsafeCast(reader.getRecordIterator(readerSchema));
     }
   }
@@ -216,10 +215,9 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
         blockContentLoc.getContentPositionInLogFile(),
         blockContentLoc.getBlockSize());
 
-    try (final HoodieAvroHFileReaderImplBase reader = 
(HoodieAvroHFileReaderImplBase)
-        
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
-            hFileReaderConfig, inlineConf, inlinePath, HoodieFileFormat.HFILE,
-            Option.of(getSchemaFromHeader()))) {
+    try (final HoodieAvroHFileReaderImplBase reader = 
(HoodieAvroHFileReaderImplBase) HoodieIOFactory.getIOFactory(inlineConf)
+        .getReaderFactory(HoodieRecordType.AVRO)
+        .getFileReader(hFileReaderConfig, inlineConf, inlinePath, 
HoodieFileFormat.HFILE, Option.of(getSchemaFromHeader()))) {
       // Get writer's schema from the header
       final ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator =
           fullKey ? reader.getRecordsByKeysIterator(sortedKeys, readerSchema) 
: reader.getRecordsByKeyPrefixIterator(sortedKeys, readerSchema);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
index 28c025c9020..32f4f46a955 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
@@ -25,9 +25,9 @@ import 
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.io.SeekableDataInputStream;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.inline.InLineFSUtils;
@@ -150,7 +150,7 @@ public class HoodieParquetDataBlock extends HoodieDataBlock 
{
 
     Schema writerSchema = new 
Schema.Parser().parse(this.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
 
-    ClosableIterator<HoodieRecord<T>> iterator = 
HoodieFileReaderFactory.getReaderFactory(type)
+    ClosableIterator<HoodieRecord<T>> iterator = 
HoodieIOFactory.getIOFactory(inlineConf).getReaderFactory(type)
         .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, inlineConf, 
inlineLogFilePath, PARQUET, Option.empty())
         .getRecordIterator(writerSchema, readerSchema);
     return iterator;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index 587fd31866e..8914fa5249b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -35,6 +35,8 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
index c285f04a2b2..8637c468fdd 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
@@ -22,10 +22,7 @@ import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
@@ -43,30 +40,6 @@ import static 
org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
  */
 public class HoodieFileReaderFactory {
 
-  public static HoodieFileReaderFactory 
getReaderFactory(HoodieRecord.HoodieRecordType recordType) {
-    switch (recordType) {
-      case AVRO:
-
-        try {
-          Class<?> clazz =
-              
ReflectionUtils.getClass("org.apache.hudi.io.hadoop.HoodieAvroFileReaderFactory");
-          return (HoodieFileReaderFactory) clazz.newInstance();
-        } catch (IllegalArgumentException | IllegalAccessException | 
InstantiationException e) {
-          throw new HoodieException("Unable to create 
HoodieAvroFileReaderFactory", e);
-        }
-      case SPARK:
-        try {
-          Class<?> clazz =
-              
ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileReaderFactory");
-          return (HoodieFileReaderFactory) clazz.newInstance();
-        } catch (IllegalArgumentException | IllegalAccessException | 
InstantiationException e) {
-          throw new HoodieException("Unable to create 
HoodieSparkFileReaderFactory", e);
-        }
-      default:
-        throw new UnsupportedOperationException(recordType + " record type not 
supported yet.");
-    }
-  }
-
   public HoodieFileReader getFileReader(HoodieConfig hoodieConfig, 
StorageConfiguration<?> conf, StoragePath path) throws IOException {
     final String extension = FSUtils.getFileExtension(path.toString());
     if (PARQUET.getFileExtension().equals(extension)) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
index 1c588bce8af..c0e154ed6ab 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
@@ -25,10 +25,7 @@ import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
-import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
@@ -43,39 +40,18 @@ import static 
org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
 
 public class HoodieFileWriterFactory {
 
-  private static HoodieFileWriterFactory 
getWriterFactory(HoodieRecord.HoodieRecordType recordType) {
-    switch (recordType) {
-      case AVRO:
-        try {
-          Class<?> clazz = 
ReflectionUtils.getClass("org.apache.hudi.io.hadoop.HoodieAvroFileWriterFactory");
-          return (HoodieFileWriterFactory) clazz.newInstance();
-        } catch (IllegalAccessException | IllegalArgumentException | 
InstantiationException e) {
-          throw new HoodieException("Unable to create 
HoodieAvroFileWriterFactory", e);
-        }
-      case SPARK:
-        try {
-          Class<?> clazz = 
ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileWriterFactory");
-          return (HoodieFileWriterFactory) clazz.newInstance();
-        } catch (IllegalAccessException | IllegalArgumentException | 
InstantiationException e) {
-          throw new HoodieException("Unable to create 
HoodieSparkFileWriterFactory", e);
-        }
-      default:
-        throw new UnsupportedOperationException(recordType + " record type not 
supported yet.");
-    }
-  }
-
   public static <T, I, K, O> HoodieFileWriter getFileWriter(
       String instantTime, StoragePath path, StorageConfiguration<?> conf, 
HoodieConfig config, Schema schema,
       TaskContextSupplier taskContextSupplier, HoodieRecordType recordType) 
throws IOException {
     final String extension = FSUtils.getFileExtension(path.getName());
-    HoodieFileWriterFactory factory = getWriterFactory(recordType);
+    HoodieFileWriterFactory factory = 
HoodieIOFactory.getIOFactory(conf).getWriterFactory(recordType);
     return factory.getFileWriterByFormat(extension, instantTime, path, conf, 
config, schema, taskContextSupplier);
   }
 
   public static <T, I, K, O> HoodieFileWriter getFileWriter(HoodieFileFormat 
format, OutputStream outputStream,
                                                             
StorageConfiguration<?> conf, HoodieConfig config, Schema schema, 
HoodieRecordType recordType)
       throws IOException {
-    HoodieFileWriterFactory factory = getWriterFactory(recordType);
+    HoodieFileWriterFactory factory = 
HoodieIOFactory.getIOFactory(conf).getWriterFactory(recordType);
     return factory.getFileWriterByFormat(format, outputStream, conf, config, 
schema);
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java
new file mode 100644
index 00000000000..3e715366134
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieIOFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.StorageConfiguration;
+
+/**
+ * Base class to get HoodieFileReaderFactory and HoodieFileWriterFactory
+ */
+public abstract class HoodieIOFactory {
+
+  public static HoodieIOFactory getIOFactory(StorageConfiguration<?> 
storageConf) {
+    String ioFactoryClass = 
storageConf.getString(HoodieStorageConfig.HOODIE_IO_FACTORY_CLASS.key())
+        .orElse(HoodieStorageConfig.HOODIE_IO_FACTORY_CLASS.defaultValue());
+    return getIOFactory(ioFactoryClass);
+  }
+
+  private static HoodieIOFactory getIOFactory(String ioFactoryClass) {
+    try {
+      return ReflectionUtils.loadClass(ioFactoryClass);
+    } catch (Exception e) {
+      throw new HoodieException("Unable to create " + ioFactoryClass, e);
+    }
+  }
+
+  public abstract HoodieFileReaderFactory 
getReaderFactory(HoodieRecord.HoodieRecordType recordType);
+
+  public abstract HoodieFileWriterFactory 
getWriterFactory(HoodieRecord.HoodieRecordType recordType);
+
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 68932a5224f..74079e8845a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -49,7 +49,7 @@ import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.expression.BindVisitor;
 import org.apache.hudi.expression.Expression;
 import org.apache.hudi.internal.schema.Types;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.io.storage.HoodieSeekingFileReader;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.util.Transient;
@@ -446,7 +446,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     Option<HoodieBaseFile> basefile = slice.getBaseFile();
     if (basefile.isPresent()) {
       StoragePath baseFilePath = basefile.get().getStoragePath();
-      baseFileReader = (HoodieSeekingFileReader<?>) 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+      baseFileReader = (HoodieSeekingFileReader<?>) 
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecordType.AVRO)
           .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, getStorageConf(), 
baseFilePath);
       baseFileOpenMs = timer.endTimer();
       LOG.info(String.format("Opened metadata base file from %s at instant %s 
in %d ms", baseFilePath,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index cc12c03676f..8c2ccf5f080 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -70,7 +70,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
@@ -504,9 +504,9 @@ public class HoodieTableMetadataUtil {
       }
 
       final StoragePath writeFilePath = new 
StoragePath(dataMetaClient.getBasePathV2(), pathWithPartition);
-      try (HoodieFileReader fileReader =
-               
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
-                   hoodieConfig, dataMetaClient.getStorageConf(), 
writeFilePath)) {
+      try (HoodieFileReader fileReader = 
HoodieIOFactory.getIOFactory(dataMetaClient.getStorageConf())
+          .getReaderFactory(HoodieRecordType.AVRO).getFileReader(hoodieConfig,
+              dataMetaClient.getStorageConf(), writeFilePath)) {
         try {
           final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
           if (fileBloomFilter == null) {
@@ -926,7 +926,7 @@ public class HoodieTableMetadataUtil {
 
   private static ByteBuffer readBloomFilter(StorageConfiguration<?> conf, 
StoragePath filePath) throws IOException {
     HoodieConfig hoodieConfig = getReaderConfigs(conf);
-    try (HoodieFileReader fileReader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+    try (HoodieFileReader fileReader = 
HoodieIOFactory.getIOFactory(conf).getReaderFactory(HoodieRecordType.AVRO)
         .getFileReader(hoodieConfig, conf, filePath)) {
       final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
       if (fileBloomFilter == null) {
@@ -1781,7 +1781,7 @@ public class HoodieTableMetadataUtil {
 
       final String fileId = baseFile.getFileId();
       final String instantTime = baseFile.getCommitTime();
-      HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+      HoodieFileReader reader = 
HoodieIOFactory.getIOFactory(configuration).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
           .getFileReader(config, configuration, dataFilePath);
       return getHoodieRecordIterator(reader.getRecordKeyIterator(), forDelete, 
partition, fileId, instantTime);
     });
@@ -1842,7 +1842,7 @@ public class HoodieTableMetadataUtil {
       final String fileId = baseFile.getFileId();
       final String instantTime = baseFile.getCommitTime();
       HoodieConfig hoodieConfig = getReaderConfigs(storageConf);
-      HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+      HoodieFileReader reader = 
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
           .getFileReader(hoodieConfig, storageConf, dataFilePath);
       return getHoodieRecordIterator(reader.getRecordKeyIterator(), forDelete, 
partition, fileId, instantTime);
     });
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index 93a2f5d45d2..3709c27a8b8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -44,6 +44,7 @@ import org.apache.hudi.io.IOUtils;
 import org.apache.hudi.io.storage.HoodieAvroFileReader;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.metrics.FlinkClusteringMetrics;
 import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
 import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
@@ -273,7 +274,8 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
       try {
         Option<HoodieFileReader> baseFileReader = 
StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
             ? Option.empty()
-            : 
Option.of(HoodieFileReaderFactory.getReaderFactory(table.getConfig().getRecordMerger().getRecordType())
+            : Option.of(HoodieIOFactory.getIOFactory(table.getStorageConf())
+            
.getReaderFactory(table.getConfig().getRecordMerger().getRecordType())
             .getFileReader(table.getConfig(), table.getStorageConf(), new 
StoragePath(clusteringOp.getDataFilePath())));
         HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
             .withStorage(table.getMetaClient().getStorage())
@@ -320,7 +322,8 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
     List<Iterator<RowData>> iteratorsForPartition = 
clusteringOps.stream().map(clusteringOp -> {
       Iterable<IndexedRecord> indexedRecords = () -> {
         try {
-          HoodieFileReaderFactory fileReaderFactory = 
HoodieFileReaderFactory.getReaderFactory(table.getConfig().getRecordMerger().getRecordType());
+          HoodieFileReaderFactory fileReaderFactory = 
HoodieIOFactory.getIOFactory(table.getStorageConf())
+              
.getReaderFactory(table.getConfig().getRecordMerger().getRecordType());
           HoodieAvroFileReader fileReader = (HoodieAvroFileReader) 
fileReaderFactory.getFileReader(
               table.getConfig(), table.getStorageConf(), new 
StoragePath(clusteringOp.getDataFilePath()));
 
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
index ad42c0e86fb..52c26477f47 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
@@ -26,7 +26,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StorageConfiguration;
@@ -100,7 +100,8 @@ public class HFileUtils extends BaseFileUtils {
     LOG.info("Reading schema from {}", filePath);
 
     try (HoodieFileReader fileReader =
-             
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+             HoodieIOFactory.getIOFactory(configuration)
+                 .getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
                  .getFileReader(
                      ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER,
                      configuration,
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieHadoopIOFactory.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieHadoopIOFactory.java
new file mode 100644
index 00000000000..65c8d028adb
--- /dev/null
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieHadoopIOFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.hadoop.HoodieAvroFileReaderFactory;
+import org.apache.hudi.io.hadoop.HoodieAvroFileWriterFactory;
+
+/**
+ * Creates readers and writers for AVRO record payloads.
+ * Currently uses reflection to support SPARK record payloads but
+ * this ability should be removed with [HUDI-7746]
+ */
+public class HoodieHadoopIOFactory extends HoodieIOFactory {
+
+  @Override
+  public HoodieFileReaderFactory 
getReaderFactory(HoodieRecord.HoodieRecordType recordType) {
+    switch (recordType) {
+      case AVRO:
+        return new HoodieAvroFileReaderFactory();
+      case SPARK:
+        //TODO: remove this case [HUDI-7746]
+        try {
+          return 
ReflectionUtils.loadClass("org.apache.hudi.io.storage.HoodieSparkFileReaderFactory");
+        } catch (Exception e) {
+          throw new HoodieException("Unable to create 
HoodieSparkFileReaderFactory", e);
+        }
+      default:
+        throw new UnsupportedOperationException(recordType + " record type not 
supported");
+    }
+  }
+
+  @Override
+  public HoodieFileWriterFactory 
getWriterFactory(HoodieRecord.HoodieRecordType recordType) {
+    switch (recordType) {
+      case AVRO:
+        return new HoodieAvroFileWriterFactory();
+      case SPARK:
+        //TODO: remove this case [HUDI-7746]
+        try {
+          return 
ReflectionUtils.loadClass("org.apache.hudi.io.storage.HoodieSparkFileWriterFactory");
+        } catch (Exception e) {
+          throw new HoodieException("Unable to create 
HoodieSparkFileWriterFactory", e);
+        }
+      default:
+        throw new UnsupportedOperationException(recordType + " record type not 
supported");
+    }
+  }
+}
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
index 7faf84a1ee5..85731674cd6 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
@@ -23,6 +23,7 @@ import 
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
@@ -48,7 +49,7 @@ public class TestHoodieAvroFileReaderFactory {
     // parquet file format.
     final StorageConfiguration<?> storageConf = 
HadoopFSUtils.getStorageConf(new Configuration());
     final StoragePath parquetPath = new 
StoragePath("/partition/path/f1_1-0-1_000.parquet");
-    HoodieFileReader parquetReader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+    HoodieFileReader parquetReader = 
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecordType.AVRO)
         .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, storageConf, 
parquetPath);
     assertTrue(parquetReader instanceof HoodieAvroParquetReader);
 
@@ -56,14 +57,15 @@ public class TestHoodieAvroFileReaderFactory {
     final StoragePath logPath = new StoragePath(
         
"/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1");
     final Throwable thrown = assertThrows(UnsupportedOperationException.class, 
() -> {
-      HoodieFileReader logWriter = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+      HoodieFileReader logWriter = 
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecordType.AVRO)
           .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, storageConf, logPath);
     }, "should fail since log storage reader is not supported yet.");
     assertTrue(thrown.getMessage().contains("format not supported yet."));
 
     // Orc file format.
     final StoragePath orcPath = new 
StoragePath("/partition/path/f1_1-0-1_000.orc");
-    HoodieFileReader orcReader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+    HoodieFileReader orcReader = HoodieIOFactory.getIOFactory(storageConf)
+        .getReaderFactory(HoodieRecordType.AVRO)
         .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, storageConf, orcPath);
     assertTrue(orcReader instanceof HoodieAvroOrcReader);
   }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java
index 6a94a32ed3c..0cf0ca9d445 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java
@@ -28,7 +28,7 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.io.storage.HoodieAvroFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.io.storage.HoodieOrcConfig;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
@@ -78,7 +78,7 @@ public class TestHoodieOrcReaderWriter extends 
TestHoodieReaderWriterBase {
   @Override
   protected HoodieAvroFileReader createReader(
       StorageConfiguration<?> conf) throws Exception {
-    return (HoodieAvroFileReader) 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+    return (HoodieAvroFileReader) 
HoodieIOFactory.getIOFactory(conf).getReaderFactory(HoodieRecordType.AVRO)
         .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, conf, getFilePath());
   }
 
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
index 97177ab260d..85e9fcac311 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileRecordReader.java
@@ -26,7 +26,8 @@ import 
org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
@@ -56,8 +57,9 @@ public class HoodieHFileRecordReader implements 
RecordReader<NullWritable, Array
   public HoodieHFileRecordReader(Configuration conf, InputSplit split, JobConf 
job) throws IOException {
     FileSplit fileSplit = (FileSplit) split;
     StoragePath path = convertToStoragePath(fileSplit.getPath());
-    HoodieConfig hoodieConfig = 
getReaderConfigs(HadoopFSUtils.getStorageConf(conf));
-    reader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+    StorageConfiguration<?> storageConf = HadoopFSUtils.getStorageConf(conf);
+    HoodieConfig hoodieConfig = getReaderConfigs(storageConf);
+    reader = 
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
         .getFileReader(hoodieConfig, HadoopFSUtils.getStorageConf(conf), path, 
HoodieFileFormat.HFILE, Option.empty());
 
     schema = reader.getSchema();
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index 666e51b81de..6d4b79c6896 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -25,7 +25,8 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import org.apache.avro.JsonProperties;
 import org.apache.avro.LogicalType;
@@ -306,8 +307,9 @@ public class HoodieRealtimeRecordReaderUtils {
   }
 
   public static HoodieFileReader getBaseFileReader(Path path, JobConf conf) 
throws IOException {
-    HoodieConfig hoodieConfig = 
getReaderConfigs(HadoopFSUtils.getStorageConf(conf));
-    return 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+    StorageConfiguration<?> storageConf = HadoopFSUtils.getStorageConf(conf);
+    HoodieConfig hoodieConfig = getReaderConfigs(storageConf);
+    return 
HoodieIOFactory.getIOFactory(storageConf).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
         .getFileReader(hoodieConfig, HadoopFSUtils.getStorageConf(conf), 
convertToStoragePath(path));
   }
 
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index 0fcae011638..fd3cc287323 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -43,7 +43,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieMemoryConfig;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieAvroFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -274,7 +274,8 @@ public class DFSHoodieDatasetInputReader extends 
DFSDeltaInputReader {
     if (fileSlice.getBaseFile().isPresent()) {
       // Read the base files using the latest writer schema.
       Schema schema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(schemaStr));
-      HoodieAvroFileReader reader = 
TypeUtils.unsafeCast(HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+      HoodieAvroFileReader reader = 
TypeUtils.unsafeCast(HoodieIOFactory.getIOFactory(metaClient.getStorageConf())
+          .getReaderFactory(HoodieRecordType.AVRO)
           .getFileReader(
               DEFAULT_HUDI_CONFIG_FOR_READER,
               metaClient.getStorageConf(),
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index c432707d4e2..3a942285f09 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -21,7 +21,7 @@ import org.apache.hudi.DataSourceReadOptions._
 import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, 
OPERATION, STREAMING_CHECKPOINT_IDENTIFIER}
 import org.apache.hudi.cdc.CDCRelation
 import org.apache.hudi.common.HoodieSchemaNotFoundException
-import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.config.HoodieStorageConfig
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, 
MERGE_ON_READ}
 import org.apache.hudi.common.model.WriteConcurrencyMode
 import org.apache.hudi.common.table.timeline.HoodieInstant
@@ -32,6 +32,7 @@ import 
org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
 import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.io.storage.HoodieSparkIOFactory
 import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath}
 import org.apache.hudi.util.PathUtils
 
@@ -65,6 +66,9 @@ class DefaultSource extends RelationProvider
       // Enable "passPartitionByAsOptions" to support "write.partitionBy(...)"
       
spark.conf.set("spark.sql.legacy.sources.write.passPartitionByAsOptions", 
"true")
     }
+    // Always use spark io factory
+    
spark.sparkContext.hadoopConfiguration.set(HoodieStorageConfig.HOODIE_IO_FACTORY_CLASS.key(),
+      classOf[HoodieSparkIOFactory].getName)
     // Revisit EMRFS incompatibilities, for now disable
     
spark.sparkContext.hadoopConfiguration.set("fs.s3.metadata.cache.expiration.seconds",
 "0")
   }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index ee815188d8e..a6f661c9e46 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -43,7 +43,7 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
 import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
-import org.apache.hudi.io.storage.HoodieFileReaderFactory
+import org.apache.hudi.io.storage.HoodieSparkIOFactory
 import org.apache.hudi.metadata.HoodieTableMetadata
 import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
 import org.apache.avro.Schema
@@ -758,7 +758,7 @@ object HoodieBaseRelation extends SparkAdapterSupport {
       val hoodieConfig = new HoodieConfig()
       hoodieConfig.setValue(USE_NATIVE_HFILE_READER,
         options.getOrElse(USE_NATIVE_HFILE_READER.key(), 
USE_NATIVE_HFILE_READER.defaultValue().toString))
-      val reader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+      val reader = (new 
HoodieSparkIOFactory).getReaderFactory(HoodieRecordType.AVRO)
         .getFileReader(hoodieConfig, storageConf, filePath, HFILE)
 
       val requiredRowSchema = requiredDataSchema.structTypeSchema
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 0ec37e4a8fa..7ceaddeeb12 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -62,7 +62,6 @@ import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.storage.HoodieStorage;
@@ -108,6 +107,7 @@ import static 
org.apache.hudi.common.model.HoodieRecord.PARTITION_PATH_METADATA_
 import static 
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+import static 
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
 
 /**
@@ -1488,7 +1488,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       HoodieConfig hoodieConfig = new HoodieConfig();
       hoodieConfig.setValue(HoodieReaderConfig.USE_NATIVE_HFILE_READER,
           Boolean.toString(ConfigUtils.getBooleanWithAltKeys(props, 
HoodieReaderConfig.USE_NATIVE_HFILE_READER)));
-      try (HoodieFileReader fileReader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+      try (HoodieFileReader fileReader = 
getHoodieSparkIOFactory().getReaderFactory(HoodieRecordType.AVRO)
           .getFileReader(hoodieConfig, metaClient.getStorageConf(), path)) {
         bloomFilter = fileReader.readBloomFilter();
         if (bloomFilter == null) {

Reply via email to