This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e658041a95 [HUDI-8552] Add a new merge handle based on file group 
reader for compaction in Spark (#12390)
4e658041a95 is described below

commit 4e658041a950b3b5bddccdbea440bae5ffa211cb
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Dec 4 18:19:01 2024 -0800

    [HUDI-8552] Add a new merge handle based on file group reader for 
compaction in Spark (#12390)
---
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  19 +-
 .../apache/hudi/table/EngineBroadcastManager.java  |  58 +++++
 .../apache/hudi/table/HoodieCompactionHandler.java |  11 +
 .../hudi/table/action/compact/HoodieCompactor.java |  59 ++++-
 .../client/common/HoodieSparkEngineContext.java    |   4 +
 ...HoodieSparkFileGroupReaderBasedMergeHandle.java | 268 +++++++++++++++++++++
 .../io/storage/HoodieSparkFileWriterFactory.java   |  13 +
 .../hudi/table/HoodieSparkCopyOnWriteTable.java    |  16 ++
 .../apache/hudi/table/SparkBroadcastManager.java   | 113 +++++++++
 .../HoodieSparkMergeOnReadTableCompactor.java      |   8 +
 .../SparkFileFormatInternalRowReaderContext.scala  |   6 +-
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  27 +++
 .../hudi/common/engine/HoodieEngineContext.java    |   4 +
 .../apache/hudi/common/model/HoodieWriteStat.java  |  24 +-
 .../read/HoodieBaseFileGroupRecordBuffer.java      |  84 +++++--
 .../common/table/read/HoodieFileGroupReader.java   |   7 +
 .../table/read/HoodieFileGroupRecordBuffer.java    |   5 +
 .../read/HoodieKeyBasedFileGroupRecordBuffer.java  |   7 +-
 .../hudi/common/table/read/HoodieReadStats.java    |  72 ++++++
 .../testutils/reader/HoodieTestReaderContext.java  |   3 +
 .../hudi/hadoop/utils/ObjectInspectorCache.java    |   7 +-
 ...odieFileGroupReaderBasedParquetFileFormat.scala |   7 +-
 hudi-spark-datasource/hudi-spark/pom.xml           |   7 +
 .../hudi/client/TestHoodieClientMultiWriter.java   |   0
 .../TestMultiWriterWithPreferWriterIngestion.java  |   0
 .../hudi/client/TestTableSchemaEvolution.java      |   0
 .../functional/TestConsistentBucketIndex.java      |   0
 ...DataValidationCheckForLogCompactionActions.java |   0
 .../functional/TestHoodieBackedMetadata.java       |   0
 .../TestHoodieClientOnMergeOnReadStorage.java      |   0
 .../TestMetadataUtilRLIandSIRecordGeneration.java  |   0
 .../TestRemoteFileSystemViewWithMetadataTable.java |   0
 .../TestSavepointRestoreMergeOnRead.java           |   0
 .../hudi/table/TestHoodieMergeOnReadTable.java     |   0
 .../table/action/compact/TestAsyncCompaction.java  |  30 ++-
 .../table/action/compact/TestHoodieCompactor.java  |  13 +-
 .../table/action/compact/TestInlineCompaction.java |   0
 .../TestMergeOnReadRollbackActionExecutor.java     |   0
 .../TestHoodieSparkMergeOnReadTableCompaction.java |   0
 ...HoodieSparkMergeOnReadTableIncrementalRead.java |   0
 ...dieSparkMergeOnReadTableInsertUpdateDelete.java |   0
 .../TestHoodieSparkMergeOnReadTableRollback.java   |   4 +-
 .../TestSparkNonBlockingConcurrencyControl.java    |   0
 .../read/TestHoodieFileGroupReaderOnSpark.scala    |   2 +-
 .../TestSpark35RecordPositionMetadataColumn.scala  |   2 +-
 .../functional/TestColumnStatsIndexWithSQL.scala   |   5 +-
 .../hudi/functional/TestMORDataSourceStorage.scala |   8 +-
 .../hudi/dml/TestPartialUpdateForMergeInto.scala   |  38 ++-
 .../TestHoodieDeltaStreamerWithMultiWriter.java    |   1 +
 49 files changed, 837 insertions(+), 95 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 707c86dd73a..31f221fb85f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -113,7 +113,7 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
   protected long updatedRecordsWritten = 0;
   protected long insertRecordsWritten = 0;
   protected Option<BaseKeyGenerator> keyGeneratorOpt;
-  private HoodieBaseFile baseFileToMerge;
+  protected HoodieBaseFile baseFileToMerge;
 
   protected Option<String[]> partitionFields = Option.empty();
   protected Object[] partitionValues = new Object[0];
@@ -147,6 +147,21 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
     validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
   }
 
+  /**
+   * Used by `HoodieSparkFileGroupReaderBasedMergeHandle`.
+   *
+   * @param config              Hudi write config
+   * @param instantTime         Instant time to use
+   * @param partitionPath       Partition path
+   * @param fileId              File group ID for the merge handle to operate 
on
+   * @param hoodieTable         {@link HoodieTable} instance
+   * @param taskContextSupplier Task context supplier
+   */
+  public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, 
String partitionPath,
+                           String fileId, HoodieTable<T, I, K, O> hoodieTable, 
TaskContextSupplier taskContextSupplier) {
+    super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier);
+  }
+
   private void validateAndSetAndKeyGenProps(Option<BaseKeyGenerator> 
keyGeneratorOpt, boolean populateMetaFields) {
     ValidationUtils.checkArgument(populateMetaFields == 
!keyGeneratorOpt.isPresent());
     this.keyGeneratorOpt = keyGeneratorOpt;
@@ -467,7 +482,7 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
   }
 
   public void performMergeDataValidationCheck(WriteStatus writeStatus) {
-    if (!config.isMergeDataValidationCheckEnabled()) {
+    if (!config.isMergeDataValidationCheckEnabled() || baseFileToMerge == 
null) {
       return;
     }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/EngineBroadcastManager.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/EngineBroadcastManager.java
new file mode 100644
index 00000000000..bcee2829dd8
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/EngineBroadcastManager.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.Serializable;
+
+/**
+ * Broadcast variable management for engines.
+ */
+public class EngineBroadcastManager implements Serializable {
+
+  /**
+   * Prepares and broadcasts necessary information needed by compactor.
+   */
+  public void prepareAndBroadcast() {
+    // NO operation.
+  }
+
+  /**
+   * Returns the {@link HoodieReaderContext} instance needed by the file group 
reader based on
+   * the broadcast variables.
+   *
+   * @param basePath Table base path
+   */
+  public Option<HoodieReaderContext> 
retrieveFileGroupReaderContext(StoragePath basePath) {
+    return Option.empty();
+  }
+
+  /**
+   * Retrieves the broadcast configuration.
+   */
+  public Option<Configuration> retrieveStorageConfig() {
+    return Option.empty();
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java
index 4d6a216cf23..5af5213cc09 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieCompactionHandler.java
@@ -20,11 +20,15 @@
 package org.apache.hudi.table;
 
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.CompactionOperation;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 
+import org.apache.hadoop.conf.Configuration;
+
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
@@ -42,6 +46,13 @@ public interface HoodieCompactionHandler<T> {
   Iterator<List<WriteStatus>> handleInsert(String instantTime, String 
partitionPath, String fileId,
                                            Map<String, HoodieRecord<?>> 
recordMap);
 
+  default List<WriteStatus> compactUsingFileGroupReader(String instantTime,
+                                                        CompactionOperation 
operation,
+                                                        HoodieReaderContext 
readerContext,
+                                                        Configuration conf) {
+    throw new HoodieNotSupportedException("This engine does not support file 
group reader based compaction.");
+  }
+
   default Iterator<List<WriteStatus>> handleInsertsForLogCompaction(String 
instantTime, String partitionPath, String fileId,
                                                            Map<String, 
HoodieRecord<?>> recordMap,
                                                            
Map<HoodieLogBlock.HeaderMetadataType, String> header) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index e92c2a5d7c2..96f9316902d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.compact;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.TaskContextSupplier;
@@ -45,6 +46,7 @@ import org.apache.hudi.internal.schema.utils.SerDeHelper;
 import org.apache.hudi.io.IOUtils;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.EngineBroadcastManager;
 import org.apache.hudi.table.HoodieCompactionHandler;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
@@ -87,6 +89,15 @@ public abstract class HoodieCompactor<T, I, K, O> implements 
Serializable {
    */
   public abstract void maybePersist(HoodieData<WriteStatus> writeStatus, 
HoodieEngineContext context, HoodieWriteConfig config, String instantTime);
 
+  /**
+   * @param context {@link HoodieEngineContext} instance
+   *
+   * @return the {@link EngineBroadcastManager} if available.
+   */
+  public Option<EngineBroadcastManager> 
getEngineBroadcastManager(HoodieEngineContext context) {
+    return Option.empty();
+  }
+
   /**
    * Execute compaction operations and report back status.
    */
@@ -130,13 +141,32 @@ public abstract class HoodieCompactor<T, I, K, O> 
implements Serializable {
     TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier();
     // if this is a MDT, set up the instant range of log reader just like 
regular MDT snapshot reader.
     Option<InstantRange> instantRange = 
CompactHelpers.getInstance().getInstantRange(metaClient);
-    return context.parallelize(operations).map(operation -> compact(
-        compactionHandler, metaClient, config, operation, 
compactionInstantTime, maxInstantTime, instantRange, taskContextSupplier, 
executionHelper))
-        .flatMap(List::iterator);
+
+    boolean useFileGroupReaderBasedCompaction = 
context.supportsFileGroupReader()   // the engine needs to support fg reader 
first
+        && !metaClient.isMetadataTable()
+        && 
config.getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
+        && !hasBootstrapFile(operations)                                       
     // bootstrap file read for fg reader is not ready
+        && StringUtils.isNullOrEmpty(config.getInternalSchema())               
     // schema evolution support for fg reader is not ready
+        && !containsUnsupportedTypesForFileGroupReader(config.getSchema())     
     // Enum type support by fg reader is not ready
+        && config.populateMetaFields();                                        
     // Virtual key support by fg reader is not ready
+
+    if (useFileGroupReaderBasedCompaction) {
+      Option<EngineBroadcastManager> broadcastManagerOpt = 
getEngineBroadcastManager(context);
+      // Broadcast required information.
+      
broadcastManagerOpt.ifPresent(EngineBroadcastManager::prepareAndBroadcast);
+      return context.parallelize(operations).map(
+              operation -> compact(compactionHandler, metaClient, operation, 
compactionInstantTime, broadcastManagerOpt))
+          .flatMap(List::iterator);
+    } else {
+      return context.parallelize(operations).map(
+              operation -> compact(compactionHandler, metaClient, config, 
operation, compactionInstantTime, maxInstantTime,
+                  instantRange, taskContextSupplier, executionHelper))
+          .flatMap(List::iterator);
+    }
   }
 
   /**
-   * Execute a single compaction operation and report back status.
+   * Execute a single compaction operation using file group reader and report 
back status.
    */
   public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
                                    HoodieTableMetaClient metaClient,
@@ -237,6 +267,7 @@ public abstract class HoodieCompactor<T, I, K, O> 
implements Serializable {
     Iterator<List<WriteStatus>> result;
     result = executionHelper.writeFileAndGetWriteStats(compactionHandler, 
operation, instantTime, scanner, oldDataFileOpt);
     scanner.close();
+
     Iterable<List<WriteStatus>> resultIterable = () -> result;
     return StreamSupport.stream(resultIterable.spliterator(), 
false).flatMap(Collection::stream).peek(s -> {
       final HoodieWriteStat stat = s.getStat();
@@ -261,6 +292,19 @@ public abstract class HoodieCompactor<T, I, K, O> 
implements Serializable {
     }).collect(toList());
   }
 
+  /**
+   * Execute a single compaction operation and report back status.
+   */
+  public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
+                                   HoodieTableMetaClient metaClient,
+                                   CompactionOperation operation,
+                                   String instantTime,
+                                   Option<EngineBroadcastManager> 
broadcastManagerOpt) throws IOException {
+    return compactionHandler.compactUsingFileGroupReader(instantTime,
+        operation, 
broadcastManagerOpt.get().retrieveFileGroupReaderContext(metaClient.getBasePath()).get(),
+        broadcastManagerOpt.get().retrieveStorageConfig().get());
+  }
+
   public String getMaxInstantTime(HoodieTableMetaClient metaClient) {
     String maxInstantTime = metaClient
         
.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
@@ -278,4 +322,11 @@ public abstract class HoodieCompactor<T, I, K, O> 
implements Serializable {
     }
   }
 
+  private boolean hasBootstrapFile(List<CompactionOperation> operationList) {
+    return operationList.stream().anyMatch(operation -> 
operation.getBootstrapFilePath().isPresent());
+  }
+
+  private boolean containsUnsupportedTypesForFileGroupReader(String schemaStr) 
{
+    return HoodieAvroUtils.containsUnsupportedTypesForFileGroupReader(new 
Schema.Parser().parse(schemaStr));
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index b1763634bc0..6485e8e6ad4 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -108,6 +108,10 @@ public class HoodieSparkEngineContext extends 
HoodieEngineContext {
     return HoodieJavaRDD.of(javaSparkContext.emptyRDD());
   }
 
+  public boolean supportsFileGroupReader() {
+    return true;
+  }
+
   @Override
   public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
     return HoodieJavaRDD.of(javaSparkContext.parallelize(data, parallelism));
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
new file mode 100644
index 00000000000..b7d8d2f80f2
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.MetadataValues;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import 
org.apache.hudi.common.table.read.HoodieFileGroupReader.HoodieFileGroupReaderIterator;
+import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.config.HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS;
+
+/**
+ * A merge handle implementation based on the {@link HoodieFileGroupReader}.
+ * <p>
+ * This merge handle is used for compaction on Spark, which passes a file 
slice from the
+ * compaction operation of a single file group to a file group reader, get an 
iterator of
+ * the records, and writes the records to a new base file.
+ */
+@NotThreadSafe
+public class HoodieSparkFileGroupReaderBasedMergeHandle<T, I, K, O> extends 
HoodieMergeHandle<T, I, K, O> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieSparkFileGroupReaderBasedMergeHandle.class);
+
+  protected HoodieReaderContext readerContext;
+  protected FileSlice fileSlice;
+  protected Configuration conf;
+
+  public HoodieSparkFileGroupReaderBasedMergeHandle(HoodieWriteConfig config, 
String instantTime, HoodieTable<T, I, K, O> hoodieTable,
+                                                    CompactionOperation 
operation, TaskContextSupplier taskContextSupplier,
+                                                    Option<BaseKeyGenerator> 
keyGeneratorOpt,
+                                                    HoodieReaderContext 
readerContext, Configuration conf) {
+    super(config, instantTime, operation.getPartitionPath(), 
operation.getFileId(), hoodieTable, taskContextSupplier);
+    this.keyToNewRecords = Collections.emptyMap();
+    this.readerContext = readerContext;
+    this.conf = conf;
+    Option<HoodieBaseFile> baseFileOpt =
+        operation.getBaseFile(config.getBasePath(), 
operation.getPartitionPath());
+    List<HoodieLogFile> logFiles = 
operation.getDeltaFileNames().stream().map(p ->
+            new HoodieLogFile(new StoragePath(FSUtils.constructAbsolutePath(
+                config.getBasePath(), operation.getPartitionPath()), p)))
+        .collect(Collectors.toList());
+    this.fileSlice = new FileSlice(
+        operation.getFileGroupId(),
+        operation.getBaseInstantTime(),
+        baseFileOpt.isPresent() ? baseFileOpt.get() : null,
+        logFiles);
+    this.preserveMetadata = true;
+    init(operation, this.partitionPath, baseFileOpt);
+    validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
+  }
+
+  private void validateAndSetAndKeyGenProps(Option<BaseKeyGenerator> 
keyGeneratorOpt, boolean populateMetaFields) {
+    ValidationUtils.checkArgument(populateMetaFields == 
!keyGeneratorOpt.isPresent());
+    this.keyGeneratorOpt = keyGeneratorOpt;
+  }
+
+  private void init(CompactionOperation operation, String partitionPath, 
Option<HoodieBaseFile> baseFileToMerge) {
+    LOG.info("partitionPath:" + partitionPath + ", fileId to be merged:" + 
fileId);
+    this.baseFileToMerge = baseFileToMerge.orElse(null);
+    this.writtenRecordKeys = new HashSet<>();
+    writeStatus.setStat(new HoodieWriteStat());
+    writeStatus.getStat().setTotalLogSizeCompacted(
+        
operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue());
+    try {
+      Option<String> latestValidFilePath = Option.empty();
+      if (baseFileToMerge.isPresent()) {
+        latestValidFilePath = Option.of(baseFileToMerge.get().getFileName());
+        
writeStatus.getStat().setPrevCommit(baseFileToMerge.get().getCommitTime());
+        // At the moment, we only support SI for overwrite with latest 
payload. So, we don't need to embed entire file slice here.
+        // HUDI-8518 will be taken up to fix it for any payload during which 
we might require entire file slice to be set here.
+        // Already AppendHandle adds all logs file from current file slice to 
HoodieDeltaWriteStat.
+        writeStatus.getStat().setPrevBaseFile(latestValidFilePath.get());
+      } else {
+        writeStatus.getStat().setPrevCommit(HoodieWriteStat.NULL_COMMIT);
+      }
+
+      HoodiePartitionMetadata partitionMetadata = new 
HoodiePartitionMetadata(storage, instantTime,
+          new StoragePath(config.getBasePath()),
+          FSUtils.constructAbsolutePath(config.getBasePath(), partitionPath),
+          hoodieTable.getPartitionMetafileFormat());
+      partitionMetadata.trySave();
+
+      String newFileName = FSUtils.makeBaseFileName(instantTime, writeToken, 
fileId, hoodieTable.getBaseFileExtension());
+      makeOldAndNewFilePaths(partitionPath,
+          latestValidFilePath.isPresent() ? latestValidFilePath.get() : null, 
newFileName);
+
+      LOG.info(String.format(
+          "Merging data from file group %s, to a new base file %s", fileId, 
newFilePath.toString()));
+      // file name is same for all records, in this bunch
+      writeStatus.setFileId(fileId);
+      writeStatus.setPartitionPath(partitionPath);
+      writeStatus.getStat().setPartitionPath(partitionPath);
+      writeStatus.getStat().setFileId(fileId);
+      setWriteStatusPath();
+
+      // Create Marker file,
+      // uses name of `newFilePath` instead of `newFileName`
+      // in case the sub-class may roll over the file handle name.
+      createMarkerFile(partitionPath, newFilePath.getName());
+
+      // Create the writer for writing the new version file
+      fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, 
newFilePath, hoodieTable.getStorage(),
+          config, writeSchemaWithMetaFields, taskContextSupplier, 
HoodieRecord.HoodieRecordType.SPARK);
+    } catch (IOException io) {
+      LOG.error("Error in update task at commit " + instantTime, io);
+      writeStatus.setGlobalError(io);
+      throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle 
for FileId: " + fileId + " on commit "
+          + instantTime + " on path " + 
hoodieTable.getMetaClient().getBasePath(), io);
+    }
+  }
+
+  /**
+   * Reads the file slice of a compaction operation using a file group reader,
+   * by getting an iterator of the records; then writes the records to a new 
base file
+   * using Spark parquet writer.
+   */
+  public void write() {
+    boolean usePosition = 
config.getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
+    Option<InternalSchema> internalSchemaOption = Option.empty();
+    if (!StringUtils.isNullOrEmpty(config.getInternalSchema())) {
+      internalSchemaOption = SerDeHelper.fromJson(config.getInternalSchema());
+    }
+    // Initializes file group reader
+    try (HoodieFileGroupReader<T> fileGroupReader = new 
HoodieFileGroupReader<>(
+        readerContext,
+        storage.newInstance(hoodieTable.getMetaClient().getBasePath(), new 
HadoopStorageConfiguration(conf)),
+        hoodieTable.getMetaClient().getBasePath().toString(),
+        instantTime,
+        fileSlice,
+        writeSchemaWithMetaFields,
+        writeSchemaWithMetaFields,
+        internalSchemaOption,
+        hoodieTable.getMetaClient(),
+        hoodieTable.getMetaClient().getTableConfig().getProps(),
+        0,
+        Long.MAX_VALUE,
+        usePosition)) {
+      fileGroupReader.initRecordIterators();
+      // Reads the records from the file slice
+      try (HoodieFileGroupReaderIterator<InternalRow> recordIterator
+               = (HoodieFileGroupReaderIterator<InternalRow>) 
fileGroupReader.getClosableIterator()) {
+        StructType sparkSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(writeSchemaWithMetaFields);
+        while (recordIterator.hasNext()) {
+          // Constructs Spark record for the Spark Parquet file writer
+          InternalRow row = recordIterator.next();
+          HoodieKey recordKey = new HoodieKey(
+              row.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD),
+              row.getString(HoodieRecord.PARTITION_PATH_META_FIELD_ORD));
+          HoodieSparkRecord record = new HoodieSparkRecord(recordKey, row, 
sparkSchema, false);
+          Option recordMetadata = record.getMetadata();
+          if (!partitionPath.equals(record.getPartitionPath())) {
+            HoodieUpsertException failureEx = new 
HoodieUpsertException("mismatched partition path, record partition: "
+                + record.getPartitionPath() + " but trying to insert into 
partition: " + partitionPath);
+            writeStatus.markFailure(record, failureEx, recordMetadata);
+            continue;
+          }
+          // Writes the record
+          try {
+            writeToFile(recordKey, record, writeSchemaWithMetaFields,
+                config.getPayloadConfig().getProps(), preserveMetadata);
+            writeStatus.markSuccess(record, recordMetadata);
+          } catch (Exception e) {
+            LOG.error("Error writing record  " + record, e);
+            writeStatus.markFailure(record, e, recordMetadata);
+          }
+        }
+
+        // The stats of inserts, updates, and deletes are updated once at the 
end
+        // These will be set in the write stat when closing the merge handle
+        HoodieReadStats stats = fileGroupReader.getStats();
+        this.insertRecordsWritten = stats.getNumInserts();
+        this.updatedRecordsWritten = stats.getNumUpdates();
+        this.recordsDeleted = stats.getNumDeletes();
+        this.recordsWritten = stats.getNumInserts() + stats.getNumUpdates();
+      }
+    } catch (IOException e) {
+      throw new HoodieUpsertException("Failed to compact file slice: " + 
fileSlice, e);
+    }
+  }
+
+  /**
+   * Writes a single record to the new file.
+   *
+   * @param key                          record key
+   * @param record                       the record of {@link 
HoodieSparkRecord}
+   * @param schema                       record schema
+   * @param prop                         table properties
+   * @param shouldPreserveRecordMetadata should preserve meta fields or not
+   *
+   * @throws IOException
+   */
+  protected void writeToFile(HoodieKey key, HoodieSparkRecord record, Schema 
schema, Properties prop, boolean shouldPreserveRecordMetadata)
+      throws IOException {
+    // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point 
to the
+    //       file holding this record even in cases when overall metadata is 
preserved
+    MetadataValues metadataValues = new 
MetadataValues().setFileName(newFilePath.getName());
+    HoodieRecord populatedRecord = record.prependMetaFields(schema, 
writeSchemaWithMetaFields, metadataValues, prop);
+
+    if (shouldPreserveRecordMetadata) {
+      fileWriter.write(key.getRecordKey(), populatedRecord, 
writeSchemaWithMetaFields);
+    } else {
+      fileWriter.writeWithMetadata(key, populatedRecord, 
writeSchemaWithMetaFields);
+    }
+  }
+
+  @Override
+  protected void writeIncomingRecords() {
+    // no operation.
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
index eedc560bdaf..afb7cf7c72c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.storage.row.HoodieRowParquetConfig;
 import org.apache.hudi.io.storage.row.HoodieRowParquetWriteSupport;
@@ -35,12 +36,16 @@ import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.spark.sql.HoodieDataTypeUtils;
 import org.apache.spark.sql.HoodieInternalRowUtils;
 import org.apache.spark.sql.types.StructType;
 
 import java.io.IOException;
 import java.io.OutputStream;
 
+import static org.apache.hudi.config.HoodieWriteConfig.AVRO_SCHEMA_STRING;
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_SCHEMA_OVERRIDE;
+
 public class HoodieSparkFileWriterFactory extends HoodieFileWriterFactory {
 
   public HoodieSparkFileWriterFactory(HoodieStorage storage) {
@@ -51,6 +56,14 @@ public class HoodieSparkFileWriterFactory extends 
HoodieFileWriterFactory {
   protected HoodieFileWriter newParquetFileWriter(
       String instantTime, StoragePath path, HoodieConfig config, Schema schema,
       TaskContextSupplier taskContextSupplier) throws IOException {
+    String writeSchema = config.getStringOrDefault(
+        WRITE_SCHEMA_OVERRIDE, config.getString(AVRO_SCHEMA_STRING));
+    if (!StringUtils.isNullOrEmpty(writeSchema)) {
+      // The parquet write legacy format property needs to be overridden
+      // if there is a decimal field of small precision, to maintain the 
compatibility.
+      HoodieDataTypeUtils.tryOverrideParquetWriteLegacyFormatProperty(
+          config.getProps(), new Schema.Parser().parse(writeSchema));
+    }
     boolean populateMetaFields = 
config.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS);
     String compressionCodecName = 
config.getStringOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME);
     // Support PARQUET_COMPRESSION_CODEC_NAME is ""
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 8848a2bb3c7..67a7658a309 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -33,6 +33,8 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.CompactionOperation;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -47,6 +49,7 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.io.HoodieCreateHandle;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.HoodieMergeHandleFactory;
+import org.apache.hudi.io.HoodieSparkFileGroupReaderBasedMergeHandle;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.metadata.MetadataPartitionType;
@@ -77,6 +80,7 @@ import 
org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
 import org.apache.hudi.table.action.rollback.RestorePlanActionExecutor;
 import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
 
+import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -264,6 +268,18 @@ public class HoodieSparkCopyOnWriteTable<T>
     return Collections.singletonList(createHandle.close()).iterator();
   }
 
+  @Override
+  public List<WriteStatus> compactUsingFileGroupReader(String instantTime,
+                                                       CompactionOperation 
operation,
+                                                       HoodieReaderContext 
readerContext,
+                                                       Configuration conf) {
+    Option<BaseKeyGenerator> keyGeneratorOpt = 
HoodieSparkKeyGeneratorFactory.createBaseKeyGenerator(config);
+    HoodieSparkFileGroupReaderBasedMergeHandle mergeHandle = new 
HoodieSparkFileGroupReaderBasedMergeHandle(config,
+        instantTime, this, operation, taskContextSupplier, keyGeneratorOpt, 
readerContext, conf);
+    mergeHandle.write();
+    return mergeHandle.close();
+  }
+
   @Override
   public HoodieCleanMetadata clean(HoodieEngineContext context, String 
cleanInstantTime) {
     return new CleanActionExecutor<>(context, config, this, cleanInstantTime, 
false).execute();
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/SparkBroadcastManager.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/SparkBroadcastManager.java
new file mode 100644
index 00000000000..19b2b0aa241
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/SparkBroadcastManager.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.SparkAdapterSupport$;
+import org.apache.hudi.SparkFileFormatInternalRowReaderContext;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.execution.datasources.FileFormat;
+import org.apache.spark.sql.execution.datasources.parquet.SparkParquetReader;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.util.SerializableConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+/**
+ * Broadcast variable management for Spark.
+ */
+public class SparkBroadcastManager extends EngineBroadcastManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkBroadcastManager.class);
+
+  private final transient HoodieEngineContext context;
+
+  protected Option<SparkParquetReader> parquetReaderOpt = Option.empty();
+  protected Broadcast<SQLConf> sqlConfBroadcast;
+  protected Broadcast<SparkParquetReader> parquetReaderBroadcast;
+  protected Broadcast<SerializableConfiguration> configurationBroadcast;
+
+  public SparkBroadcastManager(HoodieEngineContext context) {
+    this.context = context;
+  }
+
+  @Override
+  public void prepareAndBroadcast() {
+    if (!(context instanceof HoodieSparkEngineContext)) {
+      throw new HoodieIOException("Expected to be called using Engine's 
context and not local context");
+    }
+
+    HoodieSparkEngineContext hoodieSparkEngineContext = 
(HoodieSparkEngineContext) context;
+    SQLConf sqlConf = 
hoodieSparkEngineContext.getSqlContext().sessionState().conf();
+    JavaSparkContext jsc = hoodieSparkEngineContext.jsc();
+
+    boolean returningBatch = sqlConf.parquetVectorizedReaderEnabled();
+    scala.collection.immutable.Map<String, String> options =
+        scala.collection.immutable.Map$.MODULE$.<String, String>empty()
+            .$plus(new Tuple2<>(FileFormat.OPTION_RETURNING_BATCH(), 
Boolean.toString(returningBatch)));
+
+    // Do broadcast.
+    sqlConfBroadcast = jsc.broadcast(sqlConf);
+    configurationBroadcast = jsc.broadcast(new 
SerializableConfiguration(jsc.hadoopConfiguration()));
+    // Spark parquet reader has to be instantiated on the driver and broadcast 
to the executors
+    parquetReaderOpt = 
Option.of(SparkAdapterSupport$.MODULE$.sparkAdapter().createParquetFileReader(
+        false, sqlConfBroadcast.getValue(), options, 
configurationBroadcast.getValue().value()));
+    parquetReaderBroadcast = jsc.broadcast(parquetReaderOpt.get());
+  }
+
+  @Override
+  public Option<HoodieReaderContext> 
retrieveFileGroupReaderContext(StoragePath basePath) {
+    if (parquetReaderBroadcast == null) {
+      throw new HoodieException("Spark Parquet reader broadcast is not 
initialized.");
+    }
+
+    SparkParquetReader sparkParquetReader = parquetReaderBroadcast.getValue();
+    if (sparkParquetReader != null) {
+      List<Filter> filters = new ArrayList<>();
+      return Option.of(new SparkFileFormatInternalRowReaderContext(
+          sparkParquetReader,
+          JavaConverters.asScalaBufferConverter(filters).asScala().toSeq(),
+          JavaConverters.asScalaBufferConverter(filters).asScala().toSeq()));
+    } else {
+      throw new HoodieException("Cannot get the broadcast Spark Parquet 
reader.");
+    }
+  }
+
+  @Override
+  public Option<Configuration> retrieveStorageConfig() {
+    return Option.of(configurationBroadcast.getValue().value());
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
index d47eb6d33aa..bac394a8da5 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
@@ -27,8 +27,11 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.EngineBroadcastManager;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.SparkBroadcastManager;
 
 import static 
org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE;
 
@@ -41,6 +44,11 @@ import static 
org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVE
 public class HoodieSparkMergeOnReadTableCompactor<T>
     extends HoodieCompactor<T, HoodieData<HoodieRecord<T>>, 
HoodieData<HoodieKey>, HoodieData<WriteStatus>> {
 
+  @Override
+  public Option<EngineBroadcastManager> 
getEngineBroadcastManager(HoodieEngineContext context) {
+    return Option.of(new SparkBroadcastManager(context));
+  }
+
   @Override
   public void preCompact(
       HoodieTable table, HoodieTimeline pendingCompactionTimeline, 
WriteOperationType operationType, String instantTime) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 00e14e06bf6..06d13e12a01 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -20,8 +20,8 @@
 package org.apache.hudi
 
 import 
org.apache.hudi.SparkFileFormatInternalRowReaderContext.{filterIsSafeForBootstrap,
 getAppliedRequiredSchema}
-import org.apache.hudi.avro.AvroSchemaUtils.isNullable
 import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
+import org.apache.hudi.avro.AvroSchemaUtils.isNullable
 import org.apache.hudi.common.engine.HoodieReaderContext
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieRecord
@@ -45,7 +45,7 @@ import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Sp
 import org.apache.spark.sql.hudi.SparkAdapter
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.{LongType, MetadataBuilder, StructField, 
StructType}
-import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 import org.apache.spark.unsafe.types.UTF8String
 
 import scala.collection.mutable
@@ -59,12 +59,10 @@ import scala.collection.mutable
  * @param parquetFileReader A reader that transforms a [[PartitionedFile]] to 
an iterator of
  *                          [[InternalRow]]. This is required for reading the 
base file and
  *                          not required for reading a file group with only 
log files.
- * @param recordKeyColumn   column name for the recordkey
  * @param filters           spark filters that might be pushed down into the 
reader
  * @param requiredFilters   filters that are required and should always be 
used, even in merging situations
  */
 class SparkFileFormatInternalRowReaderContext(parquetFileReader: 
SparkParquetReader,
-                                              recordKeyColumn: String,
                                               filters: Seq[Filter],
                                               requiredFilters: Seq[Filter]) 
extends BaseSparkInternalRowReaderContext {
   lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 72bd8950f01..06380d36419 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -1488,4 +1488,31 @@ public class HoodieAvroUtils {
     }
   }
 
+  /**
+   * Returns whether the schema contains types not supported by the file group 
reader.
+   * Right now only ENUM type in Avro has known issues.
+   *
+   * @param schema Avro schema
+   *
+   * @return whether the schema contains types not supported by the file group 
reader.
+   */
+  public static boolean containsUnsupportedTypesForFileGroupReader(Schema 
schema) {
+    switch (schema.getType()) {
+      case RECORD:
+        for (Field field : schema.getFields()) {
+          if (containsUnsupportedTypesForFileGroupReader(field.schema())) {
+            return true;
+          }
+        }
+        return false;
+      case ARRAY:
+        return 
containsUnsupportedTypesForFileGroupReader(schema.getElementType());
+      case MAP:
+        return 
containsUnsupportedTypesForFileGroupReader(schema.getValueType());
+      case UNION:
+        return 
containsUnsupportedTypesForFileGroupReader(getActualSchemaFromUnion(schema, 
null));
+      default:
+        return schema.getType() == Schema.Type.ENUM;
+    }
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index b16f9330292..8f5e7ebaa22 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -67,6 +67,10 @@ public abstract class HoodieEngineContext {
 
   public abstract <T> HoodieData<T> emptyHoodieData();
 
+  public boolean supportsFileGroupReader() {
+    return false;
+  }
+
   public <T> HoodieData<T> parallelize(List<T> data) {
     if (data.isEmpty()) {
       return emptyHoodieData();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
index 6d7ca6d5182..5a415847277 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.common.model;
 
+import org.apache.hudi.common.table.read.HoodieReadStats;
 import org.apache.hudi.common.util.JsonUtils;
 import org.apache.hudi.storage.StoragePath;
 
@@ -29,7 +30,7 @@ import java.util.Map;
 /**
  * Statistics about a single Hoodie write operation.
  */
-public class HoodieWriteStat implements Serializable {
+public class HoodieWriteStat extends HoodieReadStats {
 
   public static final String NULL_COMMIT = "null";
 
@@ -55,25 +56,15 @@ public class HoodieWriteStat implements Serializable {
 
   /**
    * Total number of records written for this file. - for updates, its the 
entire number of records in the file - for
-   * inserts, its the actual number of records inserted.
+   * inserts, it's the actual number of records inserted.
    */
   private long numWrites;
 
-  /**
-   * Total number of records deleted.
-   */
-  private long numDeletes;
-
   /**
    * Total number of records actually changed. (0 for inserts)
    */
   private long numUpdateWrites;
 
-  /**
-   * Total number of insert records or converted to updates (for small file 
handling).
-   */
-  private long numInserts;
-
   /**
    * Total number of bytes written.
    */
@@ -221,18 +212,10 @@ public class HoodieWriteStat implements Serializable {
     return numWrites;
   }
 
-  public long getNumDeletes() {
-    return numDeletes;
-  }
-
   public long getNumUpdateWrites() {
     return numUpdateWrites;
   }
 
-  public long getNumInserts() {
-    return numInserts;
-  }
-
   public String getFileId() {
     return fileId;
   }
@@ -418,6 +401,7 @@ public class HoodieWriteStat implements Serializable {
    * The runtime stats for writing operation.
    */
   public static class RuntimeStats implements Serializable {
+    private static final long serialVersionUID = 1L;
 
     /**
      * Total time taken to read and merge logblocks in a log file.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index e0014460916..e4e0299ec86 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -27,6 +27,7 @@ import 
org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.DeleteRecord;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
@@ -71,6 +72,8 @@ import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BAS
 import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_PARTITION_PATH;
 import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
 import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_SCHEMA;
+import static 
org.apache.hudi.common.model.HoodieRecord.HOODIE_IS_DELETED_FIELD;
+import static 
org.apache.hudi.common.model.HoodieRecord.OPERATION_METADATA_FIELD;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
 
@@ -87,6 +90,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
   protected final Option<String> payloadClass;
   protected final TypedProperties props;
   protected final ExternalSpillableMap<Serializable, Pair<Option<T>, 
Map<String, Object>>> records;
+  protected final HoodieReadStats readerStats;
   protected ClosableIterator<T> baseFileIterator;
   protected Iterator<Pair<Option<T>, Map<String, Object>>> logRecordIterator;
   protected T nextRecord;
@@ -123,6 +127,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
         
SPILLABLE_DISK_MAP_TYPE.defaultValue().name()).toUpperCase(Locale.ROOT));
     boolean isBitCaskDiskMapCompressionEnabled = 
props.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
         DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue());
+    this.readerStats = new HoodieReadStats();
     try {
       // Store merged records for all versions for this log file, set the 
in-memory footprint to maxInMemoryMapSize
       this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, 
spillableMapBasePath, new DefaultSizeEstimator<>(),
@@ -137,6 +142,11 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
     this.baseFileIterator = baseFileIterator;
   }
 
+  @Override
+  public HoodieReadStats getStats() {
+    return readerStats;
+  }
+
   /**
    * This allows hasNext() to be called multiple times without incrementing 
the iterator by more than 1
    * record. It does come with the caveat that hasNext() must be called every 
time before next(). But
@@ -185,9 +195,10 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
    * @return
    * @throws IOException
    */
-  protected Option<Pair<T, Map<String, Object>>> doProcessNextDataRecord(T 
record,
-                                                                         
Map<String, Object> metadata,
-                                                                         
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair) throws 
IOException {
+  protected Option<Pair<Option<T>, Map<String, Object>>> 
doProcessNextDataRecord(T record,
+                                                                               
  Map<String, Object> metadata,
+                                                                               
  Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair)
+      throws IOException {
     if (existingRecordMetadataPair != null) {
       if (enablePartialMerging) {
         // TODO(HUDI-7843): decouple the merging logic from the merger
@@ -212,7 +223,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
         // If pre-combine returns existing record, no need to update it
         if (combinedRecord.getData() != 
existingRecordMetadataPair.getLeft().get()) {
           return Option.of(Pair.of(
-              combinedRecord.getData(),
+              Option.ofNullable(combinedRecord.getData()),
               
readerContext.updateSchemaAndResetOrderingValInMetadata(metadata, 
combinedRecordAndSchema.getRight())));
         }
         return Option.empty();
@@ -230,7 +241,8 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
             Comparable incomingOrderingValue = readerContext.getOrderingValue(
                 Option.of(record), metadata, readerSchema, orderingFieldName, 
orderingFieldTypeOpt, orderingFieldDefault);
             if (incomingOrderingValue.compareTo(existingOrderingValue) > 0) {
-              return Option.of(Pair.of(record, metadata));
+              return Option.of(Pair.of(isDeleteRecord(Option.of(record), 
(Schema) metadata.get(INTERNAL_META_SCHEMA))
+                  ? Option.empty() : Option.of(record), metadata));
             }
             return Option.empty();
           case CUSTOM:
@@ -245,7 +257,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
                 T combinedRecordData = 
readerContext.convertAvroRecord((IndexedRecord) 
combinedRecordAndSchemaOpt.get().getLeft().getData());
                 // If pre-combine does not return existing record, update it
                 if (combinedRecordData != 
existingRecordMetadataPair.getLeft().get()) {
-                  return Option.of(Pair.of(combinedRecordData, metadata));
+                  return 
Option.of(Pair.of(Option.ofNullable(combinedRecordData), metadata));
                 }
               }
 
@@ -268,7 +280,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
 
               // If pre-combine returns existing record, no need to update it
               if (combinedRecord.getData() != 
existingRecordMetadataPair.getLeft().get()) {
-                return Option.of(Pair.of(combinedRecord.getData(), metadata));
+                return 
Option.of(Pair.of(Option.ofNullable(combinedRecord.getData()), metadata));
               }
 
               return Option.empty();
@@ -280,7 +292,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
       // NOTE: Record have to be cloned here to make sure if it holds 
low-level engine-specific
       //       payload pointing into a shared, mutable (underlying) buffer we 
get a clean copy of
       //       it since these records will be put into records(Map).
-      return Option.of(Pair.of(record, metadata));
+      return Option.of(Pair.of(Option.ofNullable(record), metadata));
     }
   }
 
@@ -379,7 +391,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
   protected Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
                             Option<T> newer, Map<String, Object> newerInfoMap) 
throws IOException {
     if (!older.isPresent()) {
-      return newer;
+      return isDeleteRecord(newer, (Schema) 
newerInfoMap.get(INTERNAL_META_SCHEMA)) ? Option.empty() : newer;
     }
 
     if (enablePartialMerging) {
@@ -401,22 +413,20 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
     } else {
       switch (recordMergeMode) {
         case COMMIT_TIME_ORDERING:
-          return newer;
+          return isDeleteRecord(newer, (Schema) 
newerInfoMap.get(INTERNAL_META_SCHEMA)) ? Option.empty() : newer;
         case EVENT_TIME_ORDERING:
-          Comparable oldOrderingValue = readerContext.getOrderingValue(
-              older, olderInfoMap, readerSchema, orderingFieldName, 
orderingFieldTypeOpt, orderingFieldDefault);
-          if (isDeleteRecordWithNaturalOrder(older, oldOrderingValue)) {
-            return newer;
-          }
           Comparable newOrderingValue = readerContext.getOrderingValue(
               newer, newerInfoMap, readerSchema, orderingFieldName, 
orderingFieldTypeOpt, orderingFieldDefault);
           if (isDeleteRecordWithNaturalOrder(newer, newOrderingValue)) {
             return Option.empty();
           }
-          if (oldOrderingValue.compareTo(newOrderingValue) > 0) {
-            return older;
+          Comparable oldOrderingValue = readerContext.getOrderingValue(
+              older, olderInfoMap, readerSchema, orderingFieldName, 
orderingFieldTypeOpt, orderingFieldDefault);
+          if (!isDeleteRecordWithNaturalOrder(older, oldOrderingValue)
+              && oldOrderingValue.compareTo(newOrderingValue) > 0) {
+            return isDeleteRecord(older, (Schema) 
olderInfoMap.get(INTERNAL_META_SCHEMA)) ? Option.empty() : older;
           }
-          return newer;
+          return isDeleteRecord(newer, (Schema) 
newerInfoMap.get(INTERNAL_META_SCHEMA)) ? Option.empty() : newer;
         case CUSTOM:
         default:
           if (payloadClass.isPresent()) {
@@ -495,14 +505,24 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
     Map<String, Object> metadata = readerContext.generateMetadataForRecord(
         baseRecord, readerSchema);
 
-    Option<T> resultRecord = logRecordInfo != null
-        ? merge(Option.of(baseRecord), metadata, logRecordInfo.getLeft(), 
logRecordInfo.getRight())
-        : merge(Option.empty(), Collections.emptyMap(), Option.of(baseRecord), 
metadata);
-    if (resultRecord.isPresent()) {
-      nextRecord = readerContext.seal(resultRecord.get());
-      return true;
+    if (logRecordInfo != null) {
+      Option<T> resultRecord = merge(Option.of(baseRecord), metadata, 
logRecordInfo.getLeft(), logRecordInfo.getRight());
+      if (resultRecord.isPresent()) {
+        // Updates
+        nextRecord = readerContext.seal(resultRecord.get());
+        readerStats.incrementNumUpdates();
+        return true;
+      } else {
+        // Deletes
+        readerStats.incrementNumDeletes();
+        return false;
+      }
     }
-    return false;
+
+    // Inserts
+    nextRecord = readerContext.seal(baseRecord);
+    readerStats.incrementNumInserts();
+    return true;
   }
 
   protected boolean hasNextLogRecord() throws IOException {
@@ -542,4 +562,18 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
                                                  Comparable orderingValue) {
     return rowOption.isEmpty() && orderingValue.equals(orderingFieldDefault);
   }
+
+  private boolean isDeleteRecord(Option<T> record, Schema schema) {
+    if (record.isEmpty()) {
+      return true;
+    }
+
+    Object operation = readerContext.getValue(record.get(), schema, 
OPERATION_METADATA_FIELD);
+    if (operation != null && 
HoodieOperation.isDeleteRecord(operation.toString())) {
+      return true;
+    }
+
+    Object deleteMarker = readerContext.getValue(record.get(), schema, 
HOODIE_IS_DELETED_FIELD);
+    return deleteMarker instanceof Boolean && (boolean) deleteMarker;
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index ac34efb0ab2..5bac955e22d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordReader;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.CachingIterator;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.EmptyIterator;
@@ -236,6 +237,12 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     }
   }
 
+  public HoodieReadStats getStats() {
+    ValidationUtils.checkArgument(recordBuffer != null,
+        "Only support getting reader stats from log merging now");
+    return recordBuffer.getStats();
+  }
+
   /**
    * @return The next record after calling {@link #hasNext}.
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
index d9ba8bcd90e..139c3529475 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
@@ -107,6 +107,11 @@ public interface HoodieFileGroupRecordBuffer<T> {
    */
   void setBaseFileIterator(ClosableIterator<T> baseFileIterator);
 
+  /**
+   * @return statistics of log merging.
+   */
+  HoodieReadStats getStats();
+
   /**
    * Check if next merged record exists.
    *
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
index 82e976f4b27..97712fe6be7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
@@ -86,11 +86,14 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupR
   @Override
   public void processNextDataRecord(T record, Map<String, Object> metadata, 
Serializable recordKey) throws IOException {
     Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair = 
records.get(recordKey);
-    Option<Pair<T, Map<String, Object>>> mergedRecordAndMetadata =
+    Option<Pair<Option<T>, Map<String, Object>>> mergedRecordAndMetadata =
         doProcessNextDataRecord(record, metadata, existingRecordMetadataPair);
+
     if (mergedRecordAndMetadata.isPresent()) {
       records.put(recordKey, Pair.of(
-          
Option.ofNullable(readerContext.seal(mergedRecordAndMetadata.get().getLeft())),
+          mergedRecordAndMetadata.get().getLeft().isPresent()
+              ? 
Option.ofNullable(readerContext.seal(mergedRecordAndMetadata.get().getLeft().get()))
+              : Option.empty(),
           mergedRecordAndMetadata.get().getRight()));
     }
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieReadStats.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieReadStats.java
new file mode 100644
index 00000000000..d3617dba499
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieReadStats.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.Serializable;
+
+/**
+ * Statistics about a single Hoodie read operation.
+ */
+@NotThreadSafe
+public class HoodieReadStats implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  // Total number of insert records or converted to updates (for small file 
handling)
+  protected long numInserts;
+  // Total number of updates
+  private long numUpdates = 0L;
+  // Total number of records deleted
+  protected long numDeletes;
+
+  public HoodieReadStats() {
+  }
+
+  public HoodieReadStats(long numInserts, long numUpdates, long numDeletes) {
+    this.numInserts = numInserts;
+    this.numUpdates = numUpdates;
+    this.numDeletes = numDeletes;
+  }
+
+  public long getNumInserts() {
+    return numInserts;
+  }
+
+  public long getNumUpdates() {
+    return numUpdates;
+  }
+
+  public long getNumDeletes() {
+    return numDeletes;
+  }
+
+  public void incrementNumInserts() {
+    numInserts++;
+  }
+
+  public void incrementNumUpdates() {
+    numUpdates++;
+  }
+
+  public void incrementNumDeletes() {
+    numDeletes++;
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
index dd8d352f132..6c3e2a03c56 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
@@ -212,6 +212,9 @@ public class HoodieTestReaderContext extends 
HoodieReaderContext<IndexedRecord>
       String fieldName
   ) {
     Schema.Field field = recordSchema.getField(fieldName);
+    if (field == null) {
+      return null;
+    }
     int pos = field.pos();
     return record.get(pos);
   }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
index 277d0305c4e..c8c3a956f55 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
@@ -23,6 +23,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
 import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -82,7 +83,11 @@ public class ObjectInspectorCache {
 
   public Object getValue(ArrayWritable record, Schema schema, String 
fieldName) {
     ArrayWritableObjectInspector objectInspector = getObjectInspector(schema);
-    return objectInspector.getStructFieldData(record, 
objectInspector.getStructFieldRef(fieldName));
+    StructField structFieldRef = objectInspector.getStructFieldRef(fieldName);
+    if (structFieldRef == null) {
+      return null;
+    }
+    return objectInspector.getStructFieldData(record, structFieldRef);
   }
 
   public ArrayWritableObjectInspector getObjectInspector(Schema schema) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 84df68058bd..65d6d42b8ea 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
 import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, 
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, 
HoodieTableSchema, HoodieTableState, SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
 import org.apache.hudi.avro.AvroSchemaUtils
 import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation, 
HoodieCDCFileGroupSplit}
@@ -31,6 +29,8 @@ import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.storage.StorageConfiguration
 import org.apache.hudi.storage.hadoop.{HadoopStorageConfiguration, 
HoodieHadoopStorage}
 
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
 import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
@@ -38,7 +38,6 @@ import org.apache.spark.sql.catalyst.expressions.JoinedRow
 import org.apache.spark.sql.execution.datasources.PartitionedFile
 import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, 
OnHeapColumnVector}
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.SQLConf.PARQUET_VECTORIZED_READER_ENABLED
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnarBatchUtils}
@@ -163,7 +162,7 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
             .getSparkPartitionedFileUtils.getPathFromPartitionedFile(file))
           fileSliceMapping.getSlice(filegroupName) match {
             case Some(fileSlice) if !isCount && (requiredSchema.nonEmpty || 
fileSlice.getLogFiles.findAny().isPresent) =>
-              val readerContext = new 
SparkFileFormatInternalRowReaderContext(parquetFileReader.value, 
tableState.recordKeyField, filters, requiredFilters)
+              val readerContext = new 
SparkFileFormatInternalRowReaderContext(parquetFileReader.value, filters, 
requiredFilters)
               val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
                 
.builder().setConf(storageConf).setBasePath(tableState.tablePath).build
               val props = metaClient.getTableConfig.getProps
diff --git a/hudi-spark-datasource/hudi-spark/pom.xml 
b/hudi-spark-datasource/hudi-spark/pom.xml
index c723b03ccc9..01b4c8d10d9 100644
--- a/hudi-spark-datasource/hudi-spark/pom.xml
+++ b/hudi-spark-datasource/hudi-spark/pom.xml
@@ -304,6 +304,13 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-test</artifactId>
+      <version>${zk-curator.version}</version>
+      <scope>test</scope>
+    </dependency>
+    
     <!-- Hadoop -->
     <dependency>
       <groupId>org.apache.hadoop</groupId>
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
similarity index 100%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
similarity index 100%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestMultiWriterWithPreferWriterIngestion.java
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
similarity index 100%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
similarity index 100%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java
similarity index 100%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
similarity index 100%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
similarity index 100%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
similarity index 100%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java
similarity index 100%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
similarity index 100%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
similarity index 100%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
similarity index 95%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
index 9f0e7547304..ee8e8d6512d 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
@@ -7,13 +7,14 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.hudi.table.action.compact;
@@ -45,8 +46,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -237,8 +238,10 @@ public class TestAsyncCompaction extends 
CompactionTestBase {
 
       // validate the compaction plan does not include pending log files.
       HoodieCompactionPlan compactionPlan = 
TimelineMetadataUtils.deserializeCompactionPlan(
-          
metaClient.reloadActiveTimeline().readCompactionPlanAsBytes(INSTANT_GENERATOR.getCompactionRequestedInstant(compactionInstantTime)).get());
-      assertTrue(compactionPlan.getOperations().stream().noneMatch(op -> 
op.getDeltaFilePaths().stream().anyMatch(deltaFile -> 
deltaFile.contains(pendingInstantTime))),
+          metaClient.reloadActiveTimeline()
+              
.readCompactionPlanAsBytes(INSTANT_GENERATOR.getCompactionRequestedInstant(compactionInstantTime)).get());
+      assertTrue(compactionPlan.getOperations().stream()
+              .noneMatch(op -> 
op.getDeltaFilePaths().stream().anyMatch(deltaFile -> 
deltaFile.contains(pendingInstantTime))),
           "compaction plan should not include pending log files");
 
       // execute inflight compaction.
@@ -329,11 +332,13 @@ public class TestAsyncCompaction extends 
CompactionTestBase {
     assertNull(tryScheduleCompaction(compactionInstantTime, client, cfg), 
"Compaction Instant can be scheduled with older timestamp");
 
     // Schedule with timestamp same as that of committed instant
-    assertNull(tryScheduleCompaction(secondInstantTime, client, cfg), 
"Compaction Instant to be scheduled can have same timestamp as committed 
instant");
+    assertNull(tryScheduleCompaction(secondInstantTime, client, cfg),
+        "Compaction Instant to be scheduled can have same timestamp as 
committed instant");
 
     final String compactionInstantTime2 = client.createNewInstantTime();
     // Schedule compaction but do not run them
-    assertNotNull(tryScheduleCompaction(compactionInstantTime2, client, cfg), 
"Compaction Instant can be scheduled with greater timestamp");
+    assertNotNull(tryScheduleCompaction(compactionInstantTime2, client, cfg),
+        "Compaction Instant can be scheduled with greater timestamp");
   }
 
   @Test
@@ -407,7 +412,8 @@ public class TestAsyncCompaction extends CompactionTestBase 
{
       metaClient.reloadActiveTimeline();
       HoodieInstant pendingCompactionInstant =
           
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
-      assertEquals(compactionInstantTime, 
pendingCompactionInstant.requestedTime(), "Pending Compaction instant has 
expected instant time");
+      assertEquals(compactionInstantTime, 
pendingCompactionInstant.requestedTime(),
+          "Pending Compaction instant has expected instant time");
 
       Set<HoodieFileGroupId> fileGroupsBeforeReplace = 
getAllFileGroups(hoodieTable, dataGen.getPartitionPaths());
       // replace by using insertOverwrite
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
similarity index 97%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index 29232732c15..c32d0c6739a 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -7,13 +7,14 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.hudi.table.action.compact;
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
similarity index 100%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
similarity index 100%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
similarity index 100%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
similarity index 100%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
similarity index 100%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
similarity index 99%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index 552f2775494..f8686dff0e2 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -80,8 +80,8 @@ import java.util.stream.Stream;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
-import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertAll;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -622,7 +622,7 @@ public class TestHoodieSparkMergeOnReadTableRollback 
extends SparkClientFunction
         .map(stat -> stat.getNumWrites() + stat.getNumUpdateWrites())
         .reduce(0L, Long::sum);
   }
-
+  
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   void testMORTableRestore(boolean restoreAfterCompaction) throws Exception {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
similarity index 100%
rename from 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
rename to 
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index db3661cb648..2f6f3530213 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -87,7 +87,7 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
     val reader = sparkAdapter.createParquetFileReader(vectorized = false, 
spark.sessionState.conf, Map.empty, 
storageConf.unwrapAs(classOf[Configuration]))
     val metaClient = 
HoodieTableMetaClient.builder().setConf(storageConf).setBasePath(tablePath).build
     val recordKeyField = metaClient.getTableConfig.getRecordKeyFields.get()(0)
-    new SparkFileFormatInternalRowReaderContext(reader, recordKeyField, 
Seq.empty, Seq.empty)
+    new SparkFileFormatInternalRowReaderContext(reader, Seq.empty, Seq.empty)
   }
 
   override def commitToTable(recordList: util.List[HoodieRecord[_]], 
operation: String, options: util.Map[String, String]): Unit = {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
index e10476ed4de..9e9dd995156 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
@@ -92,7 +92,7 @@ class TestSpark35RecordPositionMetadataColumn extends 
SparkClientFunctionalTestH
     assertFalse(allBaseFiles.isEmpty)
 
     val requiredSchema = 
SparkFileFormatInternalRowReaderContext.getAppliedRequiredSchema(dataSchema,
-        new SparkFileFormatInternalRowReaderContext(reader, "userid", 
Seq.empty, Seq.empty).supportsParquetRowIndex)
+      new SparkFileFormatInternalRowReaderContext(reader, Seq.empty, 
Seq.empty).supportsParquetRowIndex)
 
     // Confirm if the schema is as expected.
     if (HoodieSparkUtils.gteqSpark3_5) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
index 293208d8ca6..053446fb7c2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
@@ -33,6 +33,7 @@ import 
org.apache.hudi.functional.ColumnStatIndexTestBase.{ColumnStatsTestCase,
 import org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY
 import org.apache.hudi.metadata.HoodieMetadataFileSystemView
 import org.apache.hudi.util.JavaConversions
+
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
EqualTo, Expression, GreaterThan, Literal}
 import org.apache.spark.sql.types.StringType
@@ -42,6 +43,7 @@ import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.MethodSource
 
 import java.io.File
+
 import scala.collection.JavaConverters._
 
 class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase {
@@ -145,7 +147,8 @@ class TestColumnStatsIndexWithSQL extends 
ColumnStatIndexTestBase {
       PRECOMBINE_FIELD.key -> "c1",
       HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
       DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true",
-      DataSourceReadOptions.QUERY_TYPE.key -> 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL
+      DataSourceReadOptions.QUERY_TYPE.key -> 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
+      HoodieWriteConfig.RECORD_MERGE_MODE.key -> "COMMIT_TIME_ORDERING"
     ) ++ metadataOpts
     setupTable(testCase, metadataOpts, commonOpts, shouldValidate = true)
     val lastDf = dfList.last
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
index 9e0c933c01e..7dbe6f840fa 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
@@ -19,22 +19,22 @@
 
 package org.apache.hudi.functional
 
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieDataSourceHelpers}
 import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieReaderConfig}
-import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, 
HoodieTestUtils}
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
 import 
org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieDataSourceHelpers}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions.{col, lit}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.{Tag, Test}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.CsvSource
 
@@ -136,7 +136,6 @@ class TestMORDataSourceStorage extends 
SparkClientFunctionalTestHarness {
 
   @Test
   def testMergeOnReadStorageDefaultCompaction(): Unit = {
-    val preCombineField = "fare"
     val commonOpts = Map(
       "hoodie.insert.shuffle.parallelism" -> "4",
       "hoodie.upsert.shuffle.parallelism" -> "4",
@@ -151,7 +150,6 @@ class TestMORDataSourceStorage extends 
SparkClientFunctionalTestHarness {
     )
 
     var options: Map[String, String] = commonOpts
-    options += (DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> 
preCombineField)
     val dataGen = new HoodieTestDataGenerator(0xDEEF)
     val fs = HadoopFSUtils.getFs(basePath, 
spark.sparkContext.hadoopConfiguration)
     // Bulk Insert Operation
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
index 82b6e0d23a9..be9b4354d8b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
@@ -17,25 +17,25 @@
 
 package org.apache.spark.sql.hudi.dml
 
+import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils}
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.config.{HoodieCommonConfig, 
HoodieMetadataConfig, HoodieReaderConfig, HoodieStorageConfig}
 import org.apache.hudi.common.engine.HoodieLocalEngineContext
-import org.apache.hudi.common.function.SerializableFunctionUnchecked
 import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.table.log.HoodieLogFileReader
 import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType
+import org.apache.hudi.common.table.timeline.HoodieTimeline
 import org.apache.hudi.common.table.view.{FileSystemViewManager, 
FileSystemViewStorageConfig, SyncableFileSystemView}
-import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.testutils.HoodieTestUtils
-import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
+import org.apache.hudi.common.util.CompactionUtils
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, 
HoodieWriteConfig}
 import org.apache.hudi.metadata.HoodieTableMetadata
-import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils}
 
 import org.apache.avro.Schema
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 
-import java.util.function.Predicate
 import java.util.{Collections, List, Optional}
 
 import scala.collection.JavaConverters._
@@ -238,6 +238,25 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
 
       if (tableType.equals("mor")) {
         validateLogBlock(basePath, 2, Seq(Seq("price", "_ts"), Seq("_ts", 
"description")), true)
+
+        spark.sql(s"set 
${HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key} = 3")
+        // Partial updates that trigger compaction
+        spark.sql(
+          s"""
+             |merge into $tableName t0
+             |using ( select 2 as id, '_a2' as name, 18.0 as price, 1275 as ts
+             |union select 3 as id, '_a3' as name, 28.0 as price, 1280 as ts) 
s0
+             |on t0.id = s0.id
+             |when matched then update set price = s0.price, _ts = s0.ts
+             |""".stripMargin)
+        validateCompactionExecuted(basePath)
+        checkAnswer(s"select id, name, price, _ts, description from 
$tableName")(
+          Seq(1, "a1", 12.0, 1023, "a1: updated desc1"),
+          Seq(2, "a2", 18.0, 1275, "a2: updated desc2"),
+          Seq(3, "a3", 28.0, 1280, "a3: desc3")
+        )
+        spark.sql(s"set 
${HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key}"
+          + s" = 
${HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.defaultValue()}")
       }
 
       if (tableType.equals("cow")) {
@@ -422,4 +441,13 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
       assertEquals(expectedSchema, actualSchema)
     }
   }
+
+  def validateCompactionExecuted(basePath: String): Unit = {
+    val storageConf = HoodieTestUtils.getDefaultStorageConf
+    val metaClient: HoodieTableMetaClient =
+      
HoodieTableMetaClient.builder.setConf(storageConf).setBasePath(basePath).build
+    val lastCommit = 
metaClient.getActiveTimeline.getCommitsTimeline.lastInstant().get()
+    assertEquals(HoodieTimeline.COMMIT_ACTION, lastCommit.getAction)
+    CompactionUtils.getCompactionPlan(metaClient, lastCommit.requestedTime())
+  }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
index d5a14cd4a4c..c1d8b16b27f 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
@@ -104,6 +104,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends 
HoodieDeltaStreamerT
     props.setProperty("hoodie.write.lock.provider",
         "org.apache.hudi.client.transaction.lock.InProcessLockProvider");
     props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, 
"3000");
+    props.setProperty("hoodie.merge.use.record.positions", "false");
     UtilitiesTestBase.Helpers.savePropsToDFS(props, storage, propsFilePath);
     // Keep it higher than batch-size to test continuous mode
     int totalRecords = 3000;

Reply via email to