alexeykudinkin commented on a change in pull request #4888:
URL: https://github.com/apache/hudi/pull/4888#discussion_r835399807



##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -70,34 +76,59 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     )
     .getOrElse(new Properties())
 
+  private val whitelistedPayloadClasses: Set[String] = Seq(
+    classOf[OverwriteWithLatestAvroPayload]
+  ).map(_.getName).toSet
+
   override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
     val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
     val iter = mergeOnReadPartition.split match {
       case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
-        requiredSchemaFileReader(dataFileOnlySplit.dataFile.get)
+        requiredSchemaFileReader.apply(dataFileOnlySplit.dataFile.get)
+
       case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
-        logFileIterator(logFileOnlySplit, getConfig)
-      case skipMergeSplit if 
skipMergeSplit.mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL)
 =>
-        skipMergeFileIterator(skipMergeSplit, 
requiredSchemaFileReader(skipMergeSplit.dataFile.get), getConfig)
-      case payloadCombineSplit
-        if 
payloadCombineSplit.mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL)
 =>
-        payloadCombineFileIterator(payloadCombineSplit, 
fullSchemaFileReader(payloadCombineSplit.dataFile.get),
-          getConfig)
+        new LogFileIterator(logFileOnlySplit, getConfig)
+
+      case split if 
mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
+        val baseFileIterator = 
requiredSchemaFileReader.apply(split.dataFile.get)
+        new SkipMergeIterator(split, baseFileIterator, getConfig)
+
+      case split if 
mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
+        val (baseFileIterator, schema) = readBaseFile(split)
+        new RecordMergingFileIterator(split, baseFileIterator, schema, 
getConfig)
+
       case _ => throw new HoodieException(s"Unable to select an Iterator to 
read the Hoodie MOR File Split for " +
         s"file path: ${mergeOnReadPartition.split.dataFile.get.filePath}" +
         s"log paths: ${mergeOnReadPartition.split.logFiles.toString}" +
-        s"hoodie table path: ${mergeOnReadPartition.split.tablePath}" +
+        s"hoodie table path: ${tableState.tablePath}" +
         s"spark partition Index: ${mergeOnReadPartition.index}" +
-        s"merge type: ${mergeOnReadPartition.split.mergeType}")
+        s"merge type: ${mergeType}")
     }
+
     if (iter.isInstanceOf[Closeable]) {
       // register a callback to close logScanner which will be executed on 
task completion.
       // when tasks finished, this method will be called, and release 
resources.
       Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => 
iter.asInstanceOf[Closeable].close()))
     }
+
     iter
   }
 
+  private def readBaseFile(split: HoodieMergeOnReadFileSplit): 
(Iterator[InternalRow], HoodieTableSchema) = {
+    // NOTE: This is an optimization making sure that even for MOR tables we 
fetch absolute minimum
+    //       of the stored data possible, while still properly executing 
corresponding relation's semantic
+    //       and meet the query's requirements.
+    //
+    //       Here we assume that iff queried table
+    //          a) It does use one of the standard (and whitelisted) Record 
Payload classes
+    //       then we can avoid reading and parsing the records w/ _full_ 
schema, and instead only
+    //       rely on projected one, nevertheless being able to perform merging 
correctly
+    if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName))

Review comment:
       Not sure i follow how you propose to change this expr? 

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -108,270 +136,284 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     }
   }
 
-  private def logFileIterator(split: HoodieMergeOnReadFileSplit,
-                              config: Configuration): Iterator[InternalRow] =
-    new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
-      private val tableAvroSchema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
-      private val requiredAvroSchema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
-      private val requiredFieldPosition =
-        requiredSchema.structTypeSchema
-          .map(f => tableAvroSchema.getField(f.name).pos()).toList
-      private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
-      private val deserializer = 
sparkAdapter.createAvroDeserializer(requiredAvroSchema, 
requiredSchema.structTypeSchema)
-      private val unsafeProjection = 
UnsafeProjection.create(requiredSchema.structTypeSchema)
-      private var logScanner = HoodieMergeOnReadRDD.scanLog(split, 
tableAvroSchema, config)
-      private val logRecords = logScanner.getRecords
-      private val logRecordsKeyIterator = 
logRecords.keySet().iterator().asScala
-
-      private var recordToLoad: InternalRow = _
-
-      override def hasNext: Boolean = {
-        if (logRecordsKeyIterator.hasNext) {
-          val curAvrokey = logRecordsKeyIterator.next()
-          val curAvroRecord = 
logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
-          if (!curAvroRecord.isPresent) {
-            // delete record found, skipping
-            this.hasNext
-          } else {
-            val requiredAvroRecord = 
AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(), 
requiredAvroSchema,
-              requiredFieldPosition, recordBuilder)
-            val rowOpt = deserializer.deserialize(requiredAvroRecord)
-            recordToLoad = 
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
-            true
+  /**
+   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all 
of the records stored in
+   * Delta Log files (represented as [[InternalRow]]s)
+   */
+  private class LogFileIterator(split: HoodieMergeOnReadFileSplit,
+                                config: Configuration)
+    extends Iterator[InternalRow] with Closeable with AvroDeserializerSupport {
+
+    protected override val requiredAvroSchema: Schema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
+    protected override val requiredStructTypeSchema: StructType = 
requiredSchema.structTypeSchema
+
+    protected val logFileReaderAvroSchema: Schema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
+
+    protected val recordBuilder: GenericRecordBuilder = new 
GenericRecordBuilder(requiredAvroSchema)
+    protected var recordToLoad: InternalRow = _
+
+    // TODO validate whether we need to do UnsafeProjection
+    protected val unsafeProjection: UnsafeProjection = 
UnsafeProjection.create(requiredStructTypeSchema)
+
+    // NOTE: This maps _required_ schema fields onto the _full_ table schema, 
collecting their "ordinals"
+    //       w/in the record payload. This is required, to project records 
read from the Delta Log file
+    //       which always reads records in full schema (never projected, due 
to the fact that DL file might
+    //       be stored in non-columnar formats like Avro, HFile, etc)
+    private val requiredSchemaFieldOrdinals: List[Int] = 
collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema)
+
+    private var logScanner =
+      HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), 
logFileReaderAvroSchema, tableState,
+        maxCompactionMemoryInBytes, config)
+
+    private val logRecords = logScanner.getRecords.asScala
+
+    // NOTE: This iterator iterates over already projected (in required 
schema) records
+    // NOTE: This have to stay lazy to make sure it's initialized only at the 
point where it's
+    //       going to be used, since we modify `logRecords` before that and 
therefore can't do it any earlier
+    protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] =
+      logRecords.iterator.map {
+        case (_, record) =>
+          val avroRecordOpt = 
toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, 
payloadProps))
+          avroRecordOpt.map {
+            avroRecord => projectAvroUnsafe(avroRecord, requiredAvroSchema, 
requiredSchemaFieldOrdinals, recordBuilder)
           }
-        } else {
-          false
-        }
       }
 
-      override def next(): InternalRow = {
-        recordToLoad
-      }
-
-      override def close(): Unit = {
-        if (logScanner != null) {
-          try {
-            logScanner.close()
-          } finally {
-            logScanner = null
-          }
-        }
-      }
-    }
+    protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: 
HoodieRecordPayload[_]]] =
+      logRecords.remove(key)
 
-  private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit,
-                                    baseFileIterator: Iterator[InternalRow],
-                                    config: Configuration): 
Iterator[InternalRow] =
-    new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
-      private val tableAvroSchema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
-      private val requiredAvroSchema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
-      private val requiredFieldPosition =
-        requiredSchema.structTypeSchema
-          .map(f => tableAvroSchema.getField(f.name).pos()).toList
-      private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
-      private val deserializer = 
sparkAdapter.createAvroDeserializer(requiredAvroSchema, 
requiredSchema.structTypeSchema)
-      private val unsafeProjection = 
UnsafeProjection.create(requiredSchema.structTypeSchema)
-      private var logScanner = HoodieMergeOnReadRDD.scanLog(split, 
tableAvroSchema, config)
-      private val logRecords = logScanner.getRecords
-      private val logRecordsKeyIterator = 
logRecords.keySet().iterator().asScala
-
-      private var recordToLoad: InternalRow = _
-
-      @scala.annotation.tailrec
-      override def hasNext: Boolean = {
-        if (baseFileIterator.hasNext) {
-          val curRow = baseFileIterator.next()
-          recordToLoad = unsafeProjection(curRow)
-          true
+    override def hasNext: Boolean =
+      logRecordsIterator.hasNext && {
+        val avroRecordOpt = logRecordsIterator.next()
+        if (avroRecordOpt.isEmpty) {
+          // Record has been deleted, skipping
+          this.hasNext
         } else {
-          if (logRecordsKeyIterator.hasNext) {
-            val curAvrokey = logRecordsKeyIterator.next()
-            val curAvroRecord = 
logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
-            if (!curAvroRecord.isPresent) {
-              // delete record found, skipping
-              this.hasNext
-            } else {
-              val requiredAvroRecord = 
AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(), 
requiredAvroSchema,
-                requiredFieldPosition, recordBuilder)
-              val rowOpt = deserializer.deserialize(requiredAvroRecord)
-              recordToLoad = 
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
-              true
-            }
-          } else {
-            false
-          }
+          recordToLoad = unsafeProjection(deserialize(avroRecordOpt.get))
+          true
         }
       }
 
-      override def next(): InternalRow = {
-        recordToLoad
-      }
+    override final def next(): InternalRow = recordToLoad
 
-      override def close(): Unit = {
-        if (logScanner != null) {
-          try {
-            logScanner.close()
-          } finally {
-            logScanner = null
-          }
+    override def close(): Unit =
+      if (logScanner != null) {
+        try {
+          logScanner.close()
+        } finally {
+          logScanner = null
         }
       }
+  }
+
+  /**
+   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an 
iterator over all of the records stored in
+   * Base file as well as all of the Delta Log files simply returning 
concatenation of these streams, while not
+   * performing any combination/merging of the records w/ the same primary 
keys (ie producing duplicates potentially)
+   */
+  private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
+                                  baseFileIterator: Iterator[InternalRow],
+                                  config: Configuration)
+    extends LogFileIterator(split, config) {
+
+    override def hasNext: Boolean = {
+      if (baseFileIterator.hasNext) {
+        val curRow = baseFileIterator.next()
+        recordToLoad = unsafeProjection(curRow)
+        true
+      } else {
+        super[LogFileIterator].hasNext
+      }
     }
+  }
 
-  private def payloadCombineFileIterator(split: HoodieMergeOnReadFileSplit,
-                                         baseFileIterator: 
Iterator[InternalRow],
-                                         config: Configuration): 
Iterator[InternalRow] =
-    new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
-      private val tableAvroSchema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
-      private val requiredAvroSchema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
-      private val requiredFieldPosition =
-        requiredSchema.structTypeSchema
-          .map(f => tableAvroSchema.getField(f.name).pos()).toList
-      private val serializer = 
sparkAdapter.createAvroSerializer(tableSchema.structTypeSchema, tableAvroSchema,
-        resolveAvroSchemaNullability(tableAvroSchema))
-      private val requiredDeserializer = 
sparkAdapter.createAvroDeserializer(requiredAvroSchema, 
requiredSchema.structTypeSchema)
-      private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
-      private val unsafeProjection = 
UnsafeProjection.create(requiredSchema.structTypeSchema)
-      private var logScanner = HoodieMergeOnReadRDD.scanLog(split, 
tableAvroSchema, config)
-      private val logRecords = logScanner.getRecords
-      private val logRecordsKeyIterator = 
logRecords.keySet().iterator().asScala
-      private val keyToSkip = mutable.Set.empty[String]
-      private val recordKeyPosition = 
tableSchema.structTypeSchema.fieldIndex(recordKeyField)
-
-      private var recordToLoad: InternalRow = _
-
-      @scala.annotation.tailrec
-      override def hasNext: Boolean = {
-        if (baseFileIterator.hasNext) {
-          val curRow = baseFileIterator.next()
-          val curKey = curRow.getString(recordKeyPosition)
-          if (logRecords.containsKey(curKey)) {
-            // duplicate key found, merging
-            keyToSkip.add(curKey)
-            val mergedAvroRecord = mergeRowWithLog(curRow, curKey)
-            if (!mergedAvroRecord.isPresent) {
-              // deleted
-              this.hasNext
-            } else {
-              // load merged record as InternalRow with required schema
-              val requiredAvroRecord = 
AvroConversionUtils.buildAvroRecordBySchema(mergedAvroRecord.get(), 
requiredAvroSchema,
-                requiredFieldPosition, recordBuilder)
-              val rowOpt = requiredDeserializer.deserialize(requiredAvroRecord)
-              recordToLoad = 
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
-              true
-            }
-          } else {
-            // No merge needed, load current row with required schema
-            recordToLoad = 
unsafeProjection(createInternalRowWithSchema(curRow, 
requiredSchema.structTypeSchema, requiredFieldPosition))
-            true
-          }
+  /**
+   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an 
iterator over all of the records stored in
+   * a) Base file and all of the b) Delta Log files combining records with the 
same primary key from both of these
+   * streams
+   */
+  private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
+                                          baseFileIterator: 
Iterator[InternalRow],
+                                          baseFileReaderSchema: 
HoodieTableSchema,
+                                          config: Configuration)
+    extends LogFileIterator(split, config) {
+
+    // NOTE: Record-merging iterator supports 2 modes of operation merging 
records bearing either
+    //        - Full table's schema
+    //        - Projected schema
+    //       As such, no particular schema could be assumed, and therefore we 
rely on the caller
+    //       to correspondingly set the scheme of the expected output of 
base-file reader
+    private val baseFileReaderAvroSchema = new 
Schema.Parser().parse(baseFileReaderSchema.avroSchemaStr)
+    private val requiredSchemaFieldOrdinals: List[Int] = 
collectFieldOrdinals(requiredAvroSchema, baseFileReaderAvroSchema)
+
+    private val serializer = 
sparkAdapter.createAvroSerializer(baseFileReaderSchema.structTypeSchema,
+      baseFileReaderAvroSchema, 
resolveAvroSchemaNullability(baseFileReaderAvroSchema))
+
+    private val recordKeyOrdinal = 
baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField)
+
+    override def hasNext: Boolean = {
+      if (baseFileIterator.hasNext) {
+        val curRowRecord = baseFileIterator.next()
+        val curKey = curRowRecord.getString(recordKeyOrdinal)
+        val updatedRecordOpt = removeLogRecord(curKey)
+        if (updatedRecordOpt.isEmpty) {
+          // No merge needed, load current row with required projected schema
+          recordToLoad = unsafeProjection(projectRowUnsafe(curRowRecord, 
requiredSchema.structTypeSchema, requiredSchemaFieldOrdinals))
+          true
         } else {
-          if (logRecordsKeyIterator.hasNext) {
-            val curKey = logRecordsKeyIterator.next()
-            if (keyToSkip.contains(curKey)) {
-              this.hasNext
-            } else {
-              val insertAvroRecord = 
logRecords.get(curKey).getData.getInsertValue(tableAvroSchema, payloadProps)
-              if (!insertAvroRecord.isPresent) {
-                // stand alone delete record, skipping
-                this.hasNext
-              } else {
-                val requiredAvroRecord = AvroConversionUtils
-                  .buildAvroRecordBySchema(
-                    insertAvroRecord.get(),
-                    requiredAvroSchema,
-                    requiredFieldPosition,
-                    recordBuilder
-                  )
-                val rowOpt = 
requiredDeserializer.deserialize(requiredAvroRecord)
-                recordToLoad = 
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
-                true
-              }
-            }
+          val mergedAvroRecordOpt = merge(serialize(curRowRecord), 
updatedRecordOpt.get)
+          if (mergedAvroRecordOpt.isEmpty) {
+            // Record has been deleted, skipping
+            this.hasNext
           } else {
-            false
+            // NOTE: In occurrence of a merge we can't know the schema of the 
record being returned, b/c
+            //       record from the Delta Log will bear (full) Table schema, 
while record from the Base file
+            //       might already be read in projected one (as an 
optimization).
+            //       As such we can't use more performant 
[[projectAvroUnsafe]], and instead have to fallback
+            //       to [[projectAvro]]
+            val projectedAvroRecord = projectAvro(mergedAvroRecordOpt.get, 
requiredAvroSchema, recordBuilder)
+            recordToLoad = unsafeProjection(deserialize(projectedAvroRecord))
+            true
           }
         }
+      } else {
+        super[LogFileIterator].hasNext
       }
+    }
 
-      override def next(): InternalRow = recordToLoad
-
-      override def close(): Unit = {
-        if (logScanner != null) {
-          try {
-            logScanner.close()
-          } finally {
-            logScanner = null
-          }
-        }
-      }
+    private def serialize(curRowRecord: InternalRow): GenericRecord =
+      serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
 
-      private def mergeRowWithLog(curRow: InternalRow, curKey: String) : 
org.apache.hudi.common.util.Option[IndexedRecord] = {
-        val historyAvroRecord = 
serializer.serialize(curRow).asInstanceOf[GenericRecord]
-        val mergedRec = logRecords.get(curKey).getData
-          .combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema, 
payloadProps)
-        if (mergedRec.isPresent && mergedRec.get().getSchema != 
tableAvroSchema) {
-          
org.apache.hudi.common.util.Option.of(HoodieAvroUtils.rewriteRecord(mergedRec.get().asInstanceOf[GenericRecord],
 tableAvroSchema).asInstanceOf[IndexedRecord])
-        } else {
-          mergedRec
-        }
-      }
+    private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ 
<: HoodieRecordPayload[_]]): Option[IndexedRecord] = {
+      // NOTE: We have to pass in Avro Schema used to read from Delta Log file 
since we invoke combining API
+      //       on the record from the Delta Log
+      toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord, 
logFileReaderAvroSchema, payloadProps))
     }
+  }
 }
 
 private object HoodieMergeOnReadRDD {
+
   val CONFIG_INSTANTIATION_LOCK = new Object()
 
-  def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: 
Configuration): HoodieMergedLogRecordScanner = {
-    val fs = FSUtils.getFs(split.tablePath, config)
-    val logFiles = split.logFiles.get
+  def scanLog(logFiles: List[HoodieLogFile],
+              partitionPath: Path,
+              logSchema: Schema,
+              tableState: HoodieTableState,
+              maxCompactionMemoryInBytes: Long,
+              hadoopConf: Configuration): HoodieMergedLogRecordScanner = {

Review comment:
       I inlined `maxCompactionMemoryInBytes` into `HoodieMergeOnReadRDD` 
(previously was passed in as an arg), but i don't think it makes sense to 
eliminate it from this arg line here -- it's not a simple field access but 
requires quite some computation.

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -108,270 +139,284 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     }
   }
 
-  private def logFileIterator(split: HoodieMergeOnReadFileSplit,
-                              config: Configuration): Iterator[InternalRow] =
-    new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
-      private val tableAvroSchema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
-      private val requiredAvroSchema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
-      private val requiredFieldPosition =
-        requiredSchema.structTypeSchema
-          .map(f => tableAvroSchema.getField(f.name).pos()).toList
-      private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
-      private val deserializer = 
sparkAdapter.createAvroDeserializer(requiredAvroSchema, 
requiredSchema.structTypeSchema)
-      private val unsafeProjection = 
UnsafeProjection.create(requiredSchema.structTypeSchema)
-      private var logScanner = HoodieMergeOnReadRDD.scanLog(split, 
tableAvroSchema, config)
-      private val logRecords = logScanner.getRecords
-      private val logRecordsKeyIterator = 
logRecords.keySet().iterator().asScala
-
-      private var recordToLoad: InternalRow = _
-
-      override def hasNext: Boolean = {
-        if (logRecordsKeyIterator.hasNext) {
-          val curAvrokey = logRecordsKeyIterator.next()
-          val curAvroRecord = 
logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
-          if (!curAvroRecord.isPresent) {
-            // delete record found, skipping
-            this.hasNext
-          } else {
-            val requiredAvroRecord = 
AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(), 
requiredAvroSchema,
-              requiredFieldPosition, recordBuilder)
-            val rowOpt = deserializer.deserialize(requiredAvroRecord)
-            recordToLoad = 
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
-            true
+  /**
+   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all 
of the records stored in
+   * Delta Log files (represented as [[InternalRow]]s)
+   */
+  private class LogFileIterator(split: HoodieMergeOnReadFileSplit,
+                                config: Configuration)
+    extends Iterator[InternalRow] with Closeable with AvroDeserializerSupport {
+
+    protected override val requiredAvroSchema: Schema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
+    protected override val requiredStructTypeSchema: StructType = 
requiredSchema.structTypeSchema
+
+    protected val logFileReaderAvroSchema: Schema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
+
+    protected val recordBuilder: GenericRecordBuilder = new 
GenericRecordBuilder(requiredAvroSchema)
+    protected var recordToLoad: InternalRow = _
+
+    // TODO validate whether we need to do UnsafeProjection
+    protected val unsafeProjection: UnsafeProjection = 
UnsafeProjection.create(requiredStructTypeSchema)
+
+    // NOTE: This maps _required_ schema fields onto the _full_ table schema, 
collecting their "ordinals"
+    //       w/in the record payload. This is required, to project records 
read from the Delta Log file
+    //       which always reads records in full schema (never projected, due 
to the fact that DL file might
+    //       be stored in non-columnar formats like Avro, HFile, etc)
+    private val requiredSchemaFieldOrdinals: List[Int] = 
collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema)
+
+    private var logScanner =
+      HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), 
logFileReaderAvroSchema, tableState,
+        maxCompactionMemoryInBytes, config)
+
+    private val logRecords = logScanner.getRecords.asScala
+
+    // NOTE: This iterator iterates over already projected (in required 
schema) records
+    // NOTE: This have to stay lazy to make sure it's initialized only at the 
point where it's
+    //       going to be used, since we modify `logRecords` before that and 
therefore can't do it any earlier
+    protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] =
+      logRecords.iterator.map {
+        case (_, record) =>
+          val avroRecordOpt = 
toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, 
payloadProps))
+          avroRecordOpt.map {
+            avroRecord => projectAvroUnsafe(avroRecord, requiredAvroSchema, 
requiredSchemaFieldOrdinals, recordBuilder)
           }
-        } else {
-          false
-        }
       }
 
-      override def next(): InternalRow = {
-        recordToLoad
-      }
-
-      override def close(): Unit = {
-        if (logScanner != null) {
-          try {
-            logScanner.close()
-          } finally {
-            logScanner = null
-          }
-        }
-      }
-    }
+    protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: 
HoodieRecordPayload[_]]] =
+      logRecords.remove(key)
 
-  private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit,
-                                    baseFileIterator: Iterator[InternalRow],
-                                    config: Configuration): 
Iterator[InternalRow] =
-    new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
-      private val tableAvroSchema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
-      private val requiredAvroSchema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
-      private val requiredFieldPosition =
-        requiredSchema.structTypeSchema
-          .map(f => tableAvroSchema.getField(f.name).pos()).toList
-      private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
-      private val deserializer = 
sparkAdapter.createAvroDeserializer(requiredAvroSchema, 
requiredSchema.structTypeSchema)
-      private val unsafeProjection = 
UnsafeProjection.create(requiredSchema.structTypeSchema)
-      private var logScanner = HoodieMergeOnReadRDD.scanLog(split, 
tableAvroSchema, config)
-      private val logRecords = logScanner.getRecords
-      private val logRecordsKeyIterator = 
logRecords.keySet().iterator().asScala
-
-      private var recordToLoad: InternalRow = _
-
-      @scala.annotation.tailrec
-      override def hasNext: Boolean = {
-        if (baseFileIterator.hasNext) {
-          val curRow = baseFileIterator.next()
-          recordToLoad = unsafeProjection(curRow)
-          true
+    override def hasNext: Boolean =
+      logRecordsIterator.hasNext && {
+        val avroRecordOpt = logRecordsIterator.next()
+        if (avroRecordOpt.isEmpty) {
+          // Record has been deleted, skipping
+          this.hasNext
         } else {
-          if (logRecordsKeyIterator.hasNext) {
-            val curAvrokey = logRecordsKeyIterator.next()
-            val curAvroRecord = 
logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
-            if (!curAvroRecord.isPresent) {
-              // delete record found, skipping
-              this.hasNext
-            } else {
-              val requiredAvroRecord = 
AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(), 
requiredAvroSchema,
-                requiredFieldPosition, recordBuilder)
-              val rowOpt = deserializer.deserialize(requiredAvroRecord)
-              recordToLoad = 
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
-              true
-            }
-          } else {
-            false
-          }
+          recordToLoad = unsafeProjection(deserialize(avroRecordOpt.get))
+          true
         }
       }
 
-      override def next(): InternalRow = {
-        recordToLoad
-      }
+    override final def next(): InternalRow = recordToLoad
 
-      override def close(): Unit = {
-        if (logScanner != null) {
-          try {
-            logScanner.close()
-          } finally {
-            logScanner = null
-          }
+    override def close(): Unit =
+      if (logScanner != null) {
+        try {
+          logScanner.close()
+        } finally {
+          logScanner = null
         }
       }
+  }
+
+  /**
+   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an 
iterator over all of the records stored in
+   * Base file as well as all of the Delta Log files simply returning 
concatenation of these streams, while not
+   * performing any combination/merging of the records w/ the same primary 
keys (ie producing duplicates potentially)
+   */
+  private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
+                                  baseFileIterator: Iterator[InternalRow],
+                                  config: Configuration)
+    extends LogFileIterator(split, config) {
+
+    override def hasNext: Boolean = {
+      if (baseFileIterator.hasNext) {
+        val curRow = baseFileIterator.next()
+        recordToLoad = unsafeProjection(curRow)
+        true
+      } else {
+        super[LogFileIterator].hasNext
+      }
     }
+  }
 
-  private def payloadCombineFileIterator(split: HoodieMergeOnReadFileSplit,
-                                         baseFileIterator: 
Iterator[InternalRow],
-                                         config: Configuration): 
Iterator[InternalRow] =
-    new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
-      private val tableAvroSchema = new 
Schema.Parser().parse(tableSchema.avroSchemaStr)
-      private val requiredAvroSchema = new 
Schema.Parser().parse(requiredSchema.avroSchemaStr)
-      private val requiredFieldPosition =
-        requiredSchema.structTypeSchema
-          .map(f => tableAvroSchema.getField(f.name).pos()).toList
-      private val serializer = 
sparkAdapter.createAvroSerializer(tableSchema.structTypeSchema, tableAvroSchema,
-        resolveAvroSchemaNullability(tableAvroSchema))
-      private val requiredDeserializer = 
sparkAdapter.createAvroDeserializer(requiredAvroSchema, 
requiredSchema.structTypeSchema)
-      private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
-      private val unsafeProjection = 
UnsafeProjection.create(requiredSchema.structTypeSchema)
-      private var logScanner = HoodieMergeOnReadRDD.scanLog(split, 
tableAvroSchema, config)
-      private val logRecords = logScanner.getRecords
-      private val logRecordsKeyIterator = 
logRecords.keySet().iterator().asScala
-      private val keyToSkip = mutable.Set.empty[String]
-      private val recordKeyPosition = 
tableSchema.structTypeSchema.fieldIndex(recordKeyField)
-
-      private var recordToLoad: InternalRow = _
-
-      @scala.annotation.tailrec
-      override def hasNext: Boolean = {
-        if (baseFileIterator.hasNext) {
-          val curRow = baseFileIterator.next()
-          val curKey = curRow.getString(recordKeyPosition)
-          if (logRecords.containsKey(curKey)) {
-            // duplicate key found, merging
-            keyToSkip.add(curKey)
-            val mergedAvroRecord = mergeRowWithLog(curRow, curKey)
-            if (!mergedAvroRecord.isPresent) {
-              // deleted
-              this.hasNext
-            } else {
-              // load merged record as InternalRow with required schema
-              val requiredAvroRecord = 
AvroConversionUtils.buildAvroRecordBySchema(mergedAvroRecord.get(), 
requiredAvroSchema,
-                requiredFieldPosition, recordBuilder)
-              val rowOpt = requiredDeserializer.deserialize(requiredAvroRecord)
-              recordToLoad = 
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
-              true
-            }
-          } else {
-            // No merge needed, load current row with required schema
-            recordToLoad = 
unsafeProjection(createInternalRowWithSchema(curRow, 
requiredSchema.structTypeSchema, requiredFieldPosition))
-            true
-          }
+  /**
+   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an 
iterator over all of the records stored in
+   * a) Base file and all of the b) Delta Log files combining records with the 
same primary key from both of these
+   * streams
+   */
+  private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
+                                          baseFileIterator: 
Iterator[InternalRow],
+                                          baseFileReaderSchema: 
HoodieTableSchema,
+                                          config: Configuration)
+    extends LogFileIterator(split, config) {
+
+    // NOTE: Record-merging iterator supports 2 modes of operation merging 
records bearing either
+    //        - Full table's schema
+    //        - Projected schema
+    //       As such, no particular schema could be assumed, and therefore we 
rely on the caller
+    //       to correspondingly set the scheme of the expected output of 
base-file reader
+    private val baseFileReaderAvroSchema = new 
Schema.Parser().parse(baseFileReaderSchema.avroSchemaStr)
+    private val requiredSchemaFieldOrdinals: List[Int] = 
collectFieldOrdinals(requiredAvroSchema, baseFileReaderAvroSchema)
+
+    private val serializer = 
sparkAdapter.createAvroSerializer(baseFileReaderSchema.structTypeSchema,
+      baseFileReaderAvroSchema, 
resolveAvroSchemaNullability(baseFileReaderAvroSchema))
+
+    private val recordKeyOrdinal = 
baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField)
+
+    override def hasNext: Boolean = {
+      if (baseFileIterator.hasNext) {
+        val curRowRecord = baseFileIterator.next()
+        val curKey = curRowRecord.getString(recordKeyOrdinal)
+        val updatedRecordOpt = removeLogRecord(curKey)
+        if (updatedRecordOpt.isEmpty) {
+          // No merge needed, load current row with required projected schema
+          recordToLoad = unsafeProjection(projectRowUnsafe(curRowRecord, 
requiredSchema.structTypeSchema, requiredSchemaFieldOrdinals))
+          true
         } else {
-          if (logRecordsKeyIterator.hasNext) {
-            val curKey = logRecordsKeyIterator.next()
-            if (keyToSkip.contains(curKey)) {
-              this.hasNext
-            } else {
-              val insertAvroRecord = 
logRecords.get(curKey).getData.getInsertValue(tableAvroSchema, payloadProps)
-              if (!insertAvroRecord.isPresent) {
-                // stand alone delete record, skipping
-                this.hasNext
-              } else {
-                val requiredAvroRecord = AvroConversionUtils
-                  .buildAvroRecordBySchema(
-                    insertAvroRecord.get(),
-                    requiredAvroSchema,
-                    requiredFieldPosition,
-                    recordBuilder
-                  )
-                val rowOpt = 
requiredDeserializer.deserialize(requiredAvroRecord)
-                recordToLoad = 
unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
-                true
-              }
-            }
+          val mergedAvroRecordOpt = merge(serialize(curRowRecord), 
updatedRecordOpt.get)
+          if (mergedAvroRecordOpt.isEmpty) {
+            // Record has been deleted, skipping
+            this.hasNext
           } else {
-            false
+            // NOTE: In occurrence of a merge we can't know the schema of the 
record being returned, b/c
+            //       record from the Delta Log will bear (full) Table schema, 
while record from the Base file
+            //       might already be read in projected one (as an 
optimization).
+            //       As such we can't use more performant 
[[projectAvroUnsafe]], and instead have to fallback
+            //       to [[projectAvro]]
+            val projectedAvroRecord = projectAvro(mergedAvroRecordOpt.get, 
requiredAvroSchema, recordBuilder)
+            recordToLoad = unsafeProjection(deserialize(projectedAvroRecord))
+            true
           }
         }
+      } else {
+        super[LogFileIterator].hasNext
       }
+    }
 
-      override def next(): InternalRow = recordToLoad
-
-      override def close(): Unit = {
-        if (logScanner != null) {
-          try {
-            logScanner.close()
-          } finally {
-            logScanner = null
-          }
-        }
-      }
+    private def serialize(curRowRecord: InternalRow): GenericRecord =
+      serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
 
-      private def mergeRowWithLog(curRow: InternalRow, curKey: String) : 
org.apache.hudi.common.util.Option[IndexedRecord] = {
-        val historyAvroRecord = 
serializer.serialize(curRow).asInstanceOf[GenericRecord]
-        val mergedRec = logRecords.get(curKey).getData
-          .combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema, 
payloadProps)
-        if (mergedRec.isPresent && mergedRec.get().getSchema != 
tableAvroSchema) {
-          
org.apache.hudi.common.util.Option.of(HoodieAvroUtils.rewriteRecord(mergedRec.get().asInstanceOf[GenericRecord],
 tableAvroSchema).asInstanceOf[IndexedRecord])
-        } else {
-          mergedRec
-        }
-      }
+    private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ 
<: HoodieRecordPayload[_]]): Option[IndexedRecord] = {
+      // NOTE: We have to pass in Avro Schema used to read from Delta Log file 
since we invoke combining API
+      //       on the record from the Delta Log
+      toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord, 
logFileReaderAvroSchema, payloadProps))
     }
+  }
 }
 
 private object HoodieMergeOnReadRDD {
+
   val CONFIG_INSTANTIATION_LOCK = new Object()
 
-  def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: 
Configuration): HoodieMergedLogRecordScanner = {
-    val fs = FSUtils.getFs(split.tablePath, config)
-    val logFiles = split.logFiles.get
+  def scanLog(logFiles: List[HoodieLogFile],
+              partitionPath: Path,
+              logSchema: Schema,
+              tableState: HoodieTableState,
+              maxCompactionMemoryInBytes: Long,
+              hadoopConf: Configuration): HoodieMergedLogRecordScanner = {
+    val tablePath = tableState.tablePath
+    val fs = FSUtils.getFs(tablePath, hadoopConf)
 
-    if (HoodieTableMetadata.isMetadataTable(split.tablePath)) {
+    if (HoodieTableMetadata.isMetadataTable(tablePath)) {
       val metadataConfig = 
HoodieMetadataConfig.newBuilder().enable(true).build()
-      val dataTableBasePath = 
getDataTableBasePathFromMetadataTable(split.tablePath)
+      val dataTableBasePath = getDataTableBasePathFromMetadataTable(tablePath)
       val metadataTable = new HoodieBackedTableMetadata(
-        new HoodieLocalEngineContext(config), metadataConfig,
+        new HoodieLocalEngineContext(hadoopConf), metadataConfig,
         dataTableBasePath,
-        config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, 
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+        hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, 
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
 
       // NOTE: In case of Metadata Table partition path equates to partition 
name (since there's just one level
       //       of indirection among MT partitions)
-      val relativePartitionPath = getRelativePartitionPath(new 
Path(split.tablePath), getPartitionPath(split))
+      val relativePartitionPath = getRelativePartitionPath(new 
Path(tablePath), partitionPath)
       metadataTable.getLogRecordScanner(logFiles.asJava, 
relativePartitionPath).getLeft
     } else {
       val logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
         .withFileSystem(fs)
-        .withBasePath(split.tablePath)
-        .withLogFilePaths(split.logFiles.get.map(logFile => 
getFilePath(logFile.getPath)).asJava)
+        .withBasePath(tablePath)
+        .withLogFilePaths(logFiles.map(logFile => 
getFilePath(logFile.getPath)).asJava)
         .withReaderSchema(logSchema)
-        .withLatestInstantTime(split.latestCommit)
+        .withLatestInstantTime(tableState.latestCommitTimestamp)
         .withReadBlocksLazily(
-          
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
+          
Try(hadoopConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
             
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
             .getOrElse(false))
         .withReverseReader(false)
         .withBufferSize(
-          config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
+          
hadoopConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
             HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
-        .withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes)
+        .withMaxMemorySizeInBytes(maxCompactionMemoryInBytes)
         .withSpillableMapBasePath(
-          config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
+          hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
             HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
 
       if (logFiles.nonEmpty) {
-        logRecordScannerBuilder.withPartition(getRelativePartitionPath(new 
Path(split.tablePath), logFiles.head.getPath.getParent))
+        logRecordScannerBuilder.withPartition(
+          getRelativePartitionPath(new Path(tableState.tablePath), 
logFiles.head.getPath.getParent))
       }
 
       logRecordScannerBuilder.build()
     }
   }
 
+  /**
+   * Projects provided instance of [[InternalRow]] into provided schema, 
assuming that the
+   * the schema of the original row is strictly a superset of the given one
+   */
+  private def projectRowUnsafe(row: InternalRow,
+                         projectedSchema: StructType,
+                         ordinals: Seq[Int]): InternalRow = {
+    val projectedRow = new SpecificInternalRow(projectedSchema)
+    var curIndex = 0
+    projectedSchema.zip(ordinals).foreach { case (field, pos) =>
+      val curField = if (row.isNullAt(pos)) {
+        null
+      } else {
+        row.get(pos, field.dataType)
+      }
+      projectedRow.update(curIndex, curField)
+      curIndex += 1
+    }
+    projectedRow
+  }
+
+  /**
+   * Projects provided instance of [[IndexedRecord]] into provided schema, 
assuming that the
+   * the schema of the original row is strictly a superset of the given one
+   */
+  def projectAvroUnsafe(record: IndexedRecord,
+                        projectedSchema: Schema,
+                        ordinals: List[Int],
+                        recordBuilder: GenericRecordBuilder): GenericRecord = {
+    val fields = projectedSchema.getFields.asScala
+    assert(fields.length == ordinals.length)

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to