alexeykudinkin commented on a change in pull request #4888:
URL: https://github.com/apache/hudi/pull/4888#discussion_r832629643
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -70,34 +73,59 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
)
.getOrElse(new Properties())
+ private val whitelistedPayloadClasses: Set[String] = Seq(
+ classOf[OverwriteWithLatestAvroPayload]
+ ).map(_.getName).toSet
+
override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {
val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
val iter = mergeOnReadPartition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
- requiredSchemaFileReader(dataFileOnlySplit.dataFile.get)
+ requiredSchemaFileReader.apply(dataFileOnlySplit.dataFile.get)
+
case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
- logFileIterator(logFileOnlySplit, getConfig)
- case skipMergeSplit if
skipMergeSplit.mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL)
=>
- skipMergeFileIterator(skipMergeSplit,
requiredSchemaFileReader(skipMergeSplit.dataFile.get), getConfig)
- case payloadCombineSplit
- if
payloadCombineSplit.mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL)
=>
- payloadCombineFileIterator(payloadCombineSplit,
fullSchemaFileReader(payloadCombineSplit.dataFile.get),
- getConfig)
+ new LogFileIterator(logFileOnlySplit, getConfig)
+
+ case split if
mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
+ val baseFileIterator =
requiredSchemaFileReader.apply(split.dataFile.get)
+ new SkipMergeIterator(split, baseFileIterator, getConfig)
+
+ case split if
mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
+ val (baseFileIterator, schema) = readBaseFile(split)
+ new RecordMergingFileIterator(split, baseFileIterator, schema,
getConfig)
+
case _ => throw new HoodieException(s"Unable to select an Iterator to
read the Hoodie MOR File Split for " +
s"file path: ${mergeOnReadPartition.split.dataFile.get.filePath}" +
s"log paths: ${mergeOnReadPartition.split.logFiles.toString}" +
- s"hoodie table path: ${mergeOnReadPartition.split.tablePath}" +
+ s"hoodie table path: ${tableState.tablePath}" +
s"spark partition Index: ${mergeOnReadPartition.index}" +
- s"merge type: ${mergeOnReadPartition.split.mergeType}")
+ s"merge type: ${mergeType}")
}
+
if (iter.isInstanceOf[Closeable]) {
// register a callback to close logScanner which will be executed on
task completion.
// when tasks finished, this method will be called, and release
resources.
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ =>
iter.asInstanceOf[Closeable].close()))
}
+
iter
}
+ private def readBaseFile(split: HoodieMergeOnReadFileSplit):
(Iterator[InternalRow], HoodieTableSchema) = {
+ // NOTE: This is an optimization making sure that even for MOR tables we
fetch absolute minimum
+ // of the stored data possible, while still properly executing
corresponding relation's semantic
+ // and meet the query's requirements.
+ //
+ // Here we assume that iff queried table
+ // a) It does use one of the standard (and whitelisted) Record
Payload classes
+ // then we can avoid reading and parsing the records w/ _full_
schema, and instead only
+ // rely on projected one, nevertheless being able to perform merging
correctly
+ if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName))
+ (fullSchemaFileReader(split.dataFile.get), tableSchema)
Review comment:
It actually was `fullSchema` at some point but this still left room for
confusion in regards to what this full schema might refer to, so i've renamed
it to `tableSchema`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]