This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 39f2a06c85 [HUDI-3979] Optimize out mandatory columns when no merging 
is performed (#5430)
39f2a06c85 is described below

commit 39f2a06c85d237c51c3bf2b1e2f6876e7eb65e06
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Fri Jul 22 15:32:44 2022 -0700

    [HUDI-3979] Optimize out mandatory columns when no merging is performed 
(#5430)
    
    For MOR, when no merging is performed there is no point in reading either 
primary-key or pre-combine-key values (unless query is referencing these). 
Avoiding reading these allows to potentially save substantial resources wasted 
for reading it out.
---
 .../org/apache/hudi/BaseFileOnlyRelation.scala     |   5 +-
 .../scala/org/apache/hudi/HoodieBaseRelation.scala | 100 +++++++++++--------
 .../org/apache/hudi/HoodieMergeOnReadRDD.scala     |  24 +++--
 .../hudi/MergeOnReadIncrementalRelation.scala      |  29 +++---
 .../apache/hudi/MergeOnReadSnapshotRelation.scala  | 109 ++++++++++++++++++---
 .../org/apache/hudi/TestHoodieRelations.scala      |   2 -
 .../functional/TestParquetColumnProjection.scala   |   2 +-
 .../apache/spark/sql/hudi/TestInsertTable.scala    |   1 +
 8 files changed, 187 insertions(+), 85 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
index 29b565712d..b7033c3bfc 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
@@ -60,9 +60,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
   override protected val shouldExtractPartitionValuesFromPartitionPath: 
Boolean =
     internalSchemaOpt.isEmpty
 
-  override lazy val mandatoryFields: Seq[String] =
-  // TODO reconcile, record's key shouldn't be mandatory for base-file only 
relation
-    Seq(recordKeyField)
+  override lazy val mandatoryFields: Seq[String] = Seq.empty
 
   override def imbueConfigs(sqlContext: SQLContext): Unit = {
     super.imbueConfigs(sqlContext)
@@ -73,6 +71,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
                                     partitionSchema: StructType,
                                     dataSchema: HoodieTableSchema,
                                     requiredSchema: HoodieTableSchema,
+                                    requestedColumns: Array[String],
                                     filters: Array[Filter]): HoodieUnsafeRDD = 
{
 
     val baseFileReader = createBaseFileReader(
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index f2d2c31f67..ff6515db32 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.hbase.io.hfile.CacheConfig
 import org.apache.hadoop.mapred.JobConf
-import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, 
createHFileReader, generateUnsafeProjection, getPartitionPath, projectSchema}
+import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, 
convertToAvroSchema, createHFileReader, generateUnsafeProjection, 
getPartitionPath, projectSchema}
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
@@ -204,6 +204,10 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
     shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath
   }
 
+  /**
+   * NOTE: This fields are accessed by [[NestedSchemaPruning]] component which 
is only enabled for
+   *       Spark >= 3.1
+   */
   lazy val (fileFormat: FileFormat, fileFormatClassName: String) =
     metaClient.getTableConfig.getBaseFileFormat match {
       case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
@@ -258,12 +262,11 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
    *
    * Check scala-doc for [[shouldExtractPartitionValuesFromPartitionPath]] for 
more details
    */
-  def dataSchema: StructType =
-    if (shouldExtractPartitionValuesFromPartitionPath) {
-      prunePartitionColumns(tableStructSchema)
-    } else {
-      tableStructSchema
-    }
+  def dataSchema: StructType = if 
(shouldExtractPartitionValuesFromPartitionPath) {
+    prunePartitionColumns(tableStructSchema)
+  } else {
+    tableStructSchema
+  }
 
   /**
    * Determines whether relation's schema could be pruned by Spark's Optimizer
@@ -346,7 +349,7 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
     if (fileSplits.isEmpty) {
       sparkSession.sparkContext.emptyRDD
     } else {
-      val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, 
requiredDataSchema, filters)
+      val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, 
requiredDataSchema, targetColumns, filters)
 
       // NOTE: In case when partition columns have been pruned from the 
required schema, we have to project
       //       the rows from the pruned schema back into the one expected by 
the caller
@@ -369,17 +372,19 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
   /**
    * Composes RDD provided file splits to read from, table and partition 
schemas, data filters to be applied
    *
-   * @param fileSplits      file splits to be handled by the RDD
-   * @param partitionSchema target table's partition schema
-   * @param dataSchema      target table's data files' schema
-   * @param requiredSchema  projected schema required by the reader
-   * @param filters         data filters to be applied
+   * @param fileSplits       file splits to be handled by the RDD
+   * @param partitionSchema  target table's partition schema
+   * @param dataSchema       target table's data files' schema
+   * @param requiredSchema   projected schema required by the reader
+   * @param requestedColumns columns requested by the query
+   * @param filters          data filters to be applied
    * @return instance of RDD (implementing [[HoodieUnsafeRDD]])
    */
   protected def composeRDD(fileSplits: Seq[FileSplit],
                            partitionSchema: StructType,
                            dataSchema: HoodieTableSchema,
                            requiredSchema: HoodieTableSchema,
+                           requestedColumns: Array[String],
                            filters: Array[Filter]): HoodieUnsafeRDD
 
   /**
@@ -551,37 +556,48 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
                                      requiredSchema: HoodieTableSchema,
                                      filters: Seq[Filter],
                                      options: Map[String, String],
-                                     hadoopConf: Configuration): 
PartitionedFile => Iterator[InternalRow] = {
-    val hfileReader = createHFileReader(
-      spark = spark,
-      dataSchema = dataSchema,
-      requiredSchema = requiredSchema,
-      filters = filters,
-      options = options,
-      hadoopConf = hadoopConf
-    )
-
-    val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
-      sparkSession = spark,
-      dataSchema = dataSchema.structTypeSchema,
-      partitionSchema = partitionSchema,
-      requiredSchema = requiredSchema.structTypeSchema,
-      filters = filters,
-      options = options,
-      hadoopConf = hadoopConf,
-      // We're delegating to Spark to append partition values to every row 
only in cases
-      // when these corresponding partition-values are not persisted w/in the 
data file itself
-      appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
-    )
+                                     hadoopConf: Configuration): 
BaseFileReader = {
+    val tableBaseFileFormat = tableConfig.getBaseFileFormat
+
+    // NOTE: PLEASE READ CAREFULLY
+    //       Lambda returned from this method is going to be invoked on the 
executor, and therefore
+    //       we have to eagerly initialize all of the readers even though only 
one specific to the type
+    //       of the file being read will be used. This is required to avoid 
serialization of the whole
+    //       relation (containing file-index for ex) and passing it to the 
executor
+    val reader = tableBaseFileFormat match {
+      case HoodieFileFormat.PARQUET =>
+        HoodieDataSourceHelper.buildHoodieParquetReader(
+          sparkSession = spark,
+          dataSchema = dataSchema.structTypeSchema,
+          partitionSchema = partitionSchema,
+          requiredSchema = requiredSchema.structTypeSchema,
+          filters = filters,
+          options = options,
+          hadoopConf = hadoopConf,
+          // We're delegating to Spark to append partition values to every row 
only in cases
+          // when these corresponding partition-values are not persisted w/in 
the data file itself
+          appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
+        )
+
+      case HoodieFileFormat.HFILE =>
+        createHFileReader(
+          spark = spark,
+          dataSchema = dataSchema,
+          requiredSchema = requiredSchema,
+          filters = filters,
+          options = options,
+          hadoopConf = hadoopConf
+        )
+
+      case _ => throw new UnsupportedOperationException(s"Base file format is 
not currently supported ($tableBaseFileFormat)")
+    }
 
     partitionedFile => {
       val extension = FSUtils.getFileExtension(partitionedFile.filePath)
-      if (HoodieFileFormat.PARQUET.getFileExtension.equals(extension)) {
-        parquetReader.apply(partitionedFile)
-      } else if (HoodieFileFormat.HFILE.getFileExtension.equals(extension)) {
-        hfileReader.apply(partitionedFile)
+      if (tableBaseFileFormat.getFileExtension.equals(extension)) {
+        reader.apply(partitionedFile)
       } else {
-        throw new UnsupportedOperationException(s"Base file format not 
supported by Spark DataSource ($partitionedFile)")
+        throw new UnsupportedOperationException(s"Invalid base-file format 
($extension), expected ($tableBaseFileFormat)")
       }
     }
   }
@@ -629,6 +645,8 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
 
 object HoodieBaseRelation extends SparkAdapterSupport {
 
+  type BaseFileReader = PartitionedFile => Iterator[InternalRow]
+
   private def generateUnsafeProjection(from: StructType, to: StructType) =
     sparkAdapter.getCatalystExpressionUtils().generateUnsafeProjection(from, 
to)
 
@@ -676,7 +694,7 @@ object HoodieBaseRelation extends SparkAdapterSupport {
                                 requiredSchema: HoodieTableSchema,
                                 filters: Seq[Filter],
                                 options: Map[String, String],
-                                hadoopConf: Configuration): PartitionedFile => 
Iterator[InternalRow] = {
+                                hadoopConf: Configuration): BaseFileReader = {
     val hadoopConfBroadcast =
       spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index c9080d021e..c4c70cb414 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -23,6 +23,7 @@ import org.apache.avro.generic.{GenericRecord, 
GenericRecordBuilder, IndexedReco
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.JobConf
+import org.apache.hudi.HoodieBaseRelation.BaseFileReader
 import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
 import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport, 
collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe, 
projectRowUnsafe, resolveAvroSchemaNullability}
 import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
@@ -55,11 +56,14 @@ import scala.util.Try
 
 case class HoodieMergeOnReadPartition(index: Int, split: 
HoodieMergeOnReadFileSplit) extends Partition
 
+case class HoodieMergeOnReadBaseFileReaders(fullSchemaFileReader: 
BaseFileReader,
+                                            
requiredSchemaFileReaderForMerging: BaseFileReader,
+                                            
requiredSchemaFileReaderForNoMerging: BaseFileReader)
+
 class HoodieMergeOnReadRDD(@transient sc: SparkContext,
                            @transient config: Configuration,
-                           fullSchemaFileReader: PartitionedFile => 
Iterator[InternalRow],
-                           requiredSchemaFileReader: PartitionedFile => 
Iterator[InternalRow],
-                           tableSchema: HoodieTableSchema,
+                           fileReaders: HoodieMergeOnReadBaseFileReaders,
+                           dataSchema: HoodieTableSchema,
                            requiredSchema: HoodieTableSchema,
                            tableState: HoodieTableState,
                            mergeType: String,
@@ -86,13 +90,13 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
     val iter = mergeOnReadPartition.split match {
       case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
-        requiredSchemaFileReader.apply(dataFileOnlySplit.dataFile.get)
+        
fileReaders.requiredSchemaFileReaderForNoMerging.apply(dataFileOnlySplit.dataFile.get)
 
       case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
         new LogFileIterator(logFileOnlySplit, getConfig)
 
       case split if 
mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
-        val baseFileIterator = 
requiredSchemaFileReader.apply(split.dataFile.get)
+        val baseFileIterator = 
fileReaders.requiredSchemaFileReaderForNoMerging.apply(split.dataFile.get)
         new SkipMergeIterator(split, baseFileIterator, getConfig)
 
       case split if 
mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
@@ -126,9 +130,9 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     //       then we can avoid reading and parsing the records w/ _full_ 
schema, and instead only
     //       rely on projected one, nevertheless being able to perform merging 
correctly
     if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName))
-      (fullSchemaFileReader(split.dataFile.get), tableSchema)
+      (fileReaders.fullSchemaFileReader(split.dataFile.get), dataSchema)
     else
-      (requiredSchemaFileReader(split.dataFile.get), requiredSchema)
+      (fileReaders.requiredSchemaFileReaderForMerging(split.dataFile.get), 
requiredSchema)
   }
 
   override protected def getPartitions: Array[Partition] =
@@ -152,7 +156,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     protected override val requiredAvroSchema: Schema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
     protected override val requiredStructTypeSchema: StructType = 
requiredSchema.structTypeSchema
 
-    protected val logFileReaderAvroSchema: Schema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
+    protected val logFileReaderAvroSchema: Schema = new 
Schema.Parser().parse(dataSchema.avroSchemaStr)
 
     protected val recordBuilder: GenericRecordBuilder = new 
GenericRecordBuilder(requiredAvroSchema)
     protected var recordToLoad: InternalRow = _
@@ -167,7 +171,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     private val requiredSchemaFieldOrdinals: List[Int] = 
collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema)
 
     private var logScanner = {
-      val internalSchema = 
tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
+      val internalSchema = 
dataSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
       HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), 
logFileReaderAvroSchema, tableState,
         maxCompactionMemoryInBytes, config, internalSchema)
     }
@@ -232,7 +236,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     override def hasNext: Boolean = {
       if (baseFileIterator.hasNext) {
         val curRow = baseFileIterator.next()
-        recordToLoad = unsafeProjection(curRow)
+        recordToLoad = curRow
         true
       } else {
         super[LogFileIterator].hasNext
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 38945cec9f..6fa130ac8c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -61,6 +61,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
                                     partitionSchema: StructType,
                                     dataSchema: HoodieTableSchema,
                                     requiredSchema: HoodieTableSchema,
+                                    requestedColumns: Array[String],
                                     filters: Array[Filter]): 
HoodieMergeOnReadRDD = {
     val fullSchemaParquetReader = createBaseFileReader(
       spark = sqlContext.sparkSession,
@@ -81,23 +82,25 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
       hadoopConf = embedInternalSchema(new Configuration(conf), 
internalSchemaOpt)
     )
 
-    val requiredSchemaParquetReader = createBaseFileReader(
-      spark = sqlContext.sparkSession,
-      partitionSchema = partitionSchema,
-      dataSchema = dataSchema,
-      requiredSchema = requiredSchema,
-      filters = filters ++ incrementalSpanRecordFilters,
-      options = optParams,
-      // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
-      //       to configure Parquet reader appropriately
-      hadoopConf = embedInternalSchema(new Configuration(conf), 
requiredSchema.internalSchema)
-    )
+    val (requiredSchemaBaseFileReaderMerging, 
requiredSchemaBaseFileReaderNoMerging) =
+      createMergeOnReadBaseFileReaders(partitionSchema, dataSchema, 
requiredSchema, requestedColumns, filters ++ incrementalSpanRecordFilters)
 
     val hoodieTableState = getTableState
     // TODO(HUDI-3639) implement incremental span record filtering w/in RDD to 
make sure returned iterator is appropriately
     //                 filtered, since file-reader might not be capable to 
perform filtering
-    new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, 
fullSchemaParquetReader, requiredSchemaParquetReader,
-      dataSchema, requiredSchema, hoodieTableState, mergeType, fileSplits)
+    new HoodieMergeOnReadRDD(
+      sqlContext.sparkContext,
+      config = jobConf,
+      fileReaders = HoodieMergeOnReadBaseFileReaders(
+        fullSchemaFileReader = fullSchemaParquetReader,
+        requiredSchemaFileReaderForMerging = 
requiredSchemaBaseFileReaderMerging,
+        requiredSchemaFileReaderForNoMerging = 
requiredSchemaBaseFileReaderNoMerging
+      ),
+      dataSchema = dataSchema,
+      requiredSchema = requiredSchema,
+      tableState = hoodieTableState,
+      mergeType = mergeType,
+      fileSplits = fileSplits)
   }
 
   override protected def collectFileSplits(partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index 9c69190f98..c6d4eafafc 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -20,14 +20,17 @@ package org.apache.hudi
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
+import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, convertToAvroSchema}
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.MergeOnReadSnapshotRelation.getFilePath
+import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
 import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView
 import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
 import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.datasources.PartitionedFile
 import org.apache.spark.sql.sources.Filter
@@ -47,9 +50,27 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
 
   override type FileSplit = HoodieMergeOnReadFileSplit
 
-  override lazy val mandatoryFields: Seq[String] =
+  /**
+   * NOTE: These are the fields that are required to properly fulfil 
Merge-on-Read (MOR)
+   *       semantic:
+   *
+   *       <ol>
+   *         <li>Primary key is required to make sure we're able to correlate 
records from the base
+   *         file with the updated records from the delta-log file</li>
+   *         <li>Pre-combine key is required to properly perform the combining 
(or merging) of the
+   *         existing and updated records</li>
+   *       </ol>
+   *
+   *       However, in cases when merging is NOT performed (for ex, if 
file-group only contains base
+   *       files but no delta-log files, or if the query-type is equal to 
[["skip_merge"]]) neither
+   *       of primary-key or pre-combine-key are required to be fetched from 
storage (unless requested
+   *       by the query), therefore saving on throughput
+   */
+  protected lazy val mandatoryFieldsForMerging: Seq[String] =
     Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
 
+  override lazy val mandatoryFields: Seq[String] = mandatoryFieldsForMerging
+
   protected val mergeType: String = 
optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
     DataSourceReadOptions.REALTIME_MERGE.defaultValue)
 
@@ -62,8 +83,9 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
                                     partitionSchema: StructType,
                                     dataSchema: HoodieTableSchema,
                                     requiredSchema: HoodieTableSchema,
+                                    requestedColumns: Array[String],
                                     filters: Array[Filter]): 
HoodieMergeOnReadRDD = {
-    val fullSchemaParquetReader = createBaseFileReader(
+    val fullSchemaBaseFileReader = createBaseFileReader(
       spark = sqlContext.sparkSession,
       partitionSchema = partitionSchema,
       dataSchema = dataSchema,
@@ -79,21 +101,23 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
       hadoopConf = embedInternalSchema(new Configuration(conf), 
internalSchemaOpt)
     )
 
-    val requiredSchemaParquetReader = createBaseFileReader(
-      spark = sqlContext.sparkSession,
-      partitionSchema = partitionSchema,
-      dataSchema = dataSchema,
-      requiredSchema = requiredSchema,
-      filters = filters,
-      options = optParams,
-      // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
-      //       to configure Parquet reader appropriately
-      hadoopConf = embedInternalSchema(new Configuration(conf), 
requiredSchema.internalSchema)
-    )
+    val (requiredSchemaBaseFileReaderMerging, 
requiredSchemaBaseFileReaderNoMerging) =
+      createMergeOnReadBaseFileReaders(partitionSchema, dataSchema, 
requiredSchema, requestedColumns, filters)
 
     val tableState = getTableState
-    new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, 
fullSchemaParquetReader, requiredSchemaParquetReader,
-      dataSchema, requiredSchema, tableState, mergeType, fileSplits)
+    new HoodieMergeOnReadRDD(
+      sqlContext.sparkContext,
+      config = jobConf,
+      fileReaders = HoodieMergeOnReadBaseFileReaders(
+        fullSchemaFileReader = fullSchemaBaseFileReader,
+        requiredSchemaFileReaderForMerging = 
requiredSchemaBaseFileReaderMerging,
+        requiredSchemaFileReaderForNoMerging = 
requiredSchemaBaseFileReaderNoMerging
+      ),
+      dataSchema = dataSchema,
+      requiredSchema = requiredSchema,
+      tableState = tableState,
+      mergeType = mergeType,
+      fileSplits = fileSplits)
   }
 
   protected override def collectFileSplits(partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
@@ -122,6 +146,61 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
       HoodieMergeOnReadFileSplit(partitionedBaseFile, logFiles)
     }.toList
   }
+
+  protected def createMergeOnReadBaseFileReaders(partitionSchema: StructType,
+                                                 dataSchema: HoodieTableSchema,
+                                                 requiredDataSchema: 
HoodieTableSchema,
+                                                 requestedColumns: 
Array[String],
+                                                 filters: Array[Filter]): 
(BaseFileReader, BaseFileReader) = {
+    val requiredSchemaFileReaderMerging = createBaseFileReader(
+      spark = sqlContext.sparkSession,
+      partitionSchema = partitionSchema,
+      dataSchema = dataSchema,
+      requiredSchema = requiredDataSchema,
+      filters = filters,
+      options = optParams,
+      // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
+      //       to configure Parquet reader appropriately
+      hadoopConf = embedInternalSchema(new Configuration(conf), 
requiredDataSchema.internalSchema)
+    )
+
+    // Check whether fields required for merging were also requested to be 
fetched
+    // by the query:
+    //    - In case they were, there's no optimization we could apply here (we 
will have
+    //    to fetch such fields)
+    //    - In case they were not, we will provide 2 separate file-readers
+    //        a) One which would be applied to file-groups w/ delta-logs 
(merging)
+    //        b) One which would be applied to file-groups w/ no delta-logs or
+    //           in case query-mode is skipping merging
+    val requiredColumns = 
mandatoryFieldsForMerging.map(HoodieAvroUtils.getRootLevelFieldName)
+    if (requiredColumns.forall(requestedColumns.contains)) {
+      (requiredSchemaFileReaderMerging, requiredSchemaFileReaderMerging)
+    } else {
+      val prunedRequiredSchema = {
+        val superfluousColumnNames = 
requiredColumns.filterNot(requestedColumns.contains)
+        val prunedStructSchema =
+          StructType(requiredDataSchema.structTypeSchema.fields
+            .filterNot(f => superfluousColumnNames.contains(f.name)))
+
+        HoodieTableSchema(prunedStructSchema, 
convertToAvroSchema(prunedStructSchema).toString)
+      }
+
+      val requiredSchemaFileReaderNoMerging = createBaseFileReader(
+        spark = sqlContext.sparkSession,
+        partitionSchema = partitionSchema,
+        dataSchema = dataSchema,
+        requiredSchema = prunedRequiredSchema,
+        filters = filters,
+        options = optParams,
+        // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
+        //       to configure Parquet reader appropriately
+        hadoopConf = embedInternalSchema(new Configuration(conf), 
requiredDataSchema.internalSchema)
+      )
+
+      (requiredSchemaFileReaderMerging, requiredSchemaFileReaderNoMerging)
+    }
+  }
+
 }
 
 object MergeOnReadSnapshotRelation {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieRelations.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieRelations.scala
index a8db68edd1..ce9a5d571e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieRelations.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieRelations.scala
@@ -51,6 +51,4 @@ class TestHoodieRelations {
       requiredStructSchema.fields.toSeq
     )
   }
-
-
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index 4366e8c95f..b24a341d4a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -54,7 +54,7 @@ class TestParquetColumnProjection extends 
SparkClientFunctionalTestHarness with
     DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
classOf[NonpartitionedKeyGenerator].getName
   )
 
-  @Disabled("HUDI-3896")
+  @Disabled("Currently disabled b/c of the fallback to HadoopFsRelation")
   @Test
   def testBaseFileOnlyViewRelation(): Unit = {
     val tablePath = s"$basePath/cow"
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 54dd45f3f5..57c826af92 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -24,6 +24,7 @@ import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieDuplicateKeyException
 import org.apache.hudi.keygen.ComplexKeyGenerator
 import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.internal.SQLConf
 
 import java.io.File
 

Reply via email to