This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 39f2a06c85 [HUDI-3979] Optimize out mandatory columns when no merging
is performed (#5430)
39f2a06c85 is described below
commit 39f2a06c85d237c51c3bf2b1e2f6876e7eb65e06
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Fri Jul 22 15:32:44 2022 -0700
[HUDI-3979] Optimize out mandatory columns when no merging is performed
(#5430)
For MOR, when no merging is performed there is no point in reading either
primary-key or pre-combine-key values (unless query is referencing these).
Avoiding reading these allows to potentially save substantial resources wasted
for reading it out.
---
.../org/apache/hudi/BaseFileOnlyRelation.scala | 5 +-
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 100 +++++++++++--------
.../org/apache/hudi/HoodieMergeOnReadRDD.scala | 24 +++--
.../hudi/MergeOnReadIncrementalRelation.scala | 29 +++---
.../apache/hudi/MergeOnReadSnapshotRelation.scala | 109 ++++++++++++++++++---
.../org/apache/hudi/TestHoodieRelations.scala | 2 -
.../functional/TestParquetColumnProjection.scala | 2 +-
.../apache/spark/sql/hudi/TestInsertTable.scala | 1 +
8 files changed, 187 insertions(+), 85 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
index 29b565712d..b7033c3bfc 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
@@ -60,9 +60,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
override protected val shouldExtractPartitionValuesFromPartitionPath:
Boolean =
internalSchemaOpt.isEmpty
- override lazy val mandatoryFields: Seq[String] =
- // TODO reconcile, record's key shouldn't be mandatory for base-file only
relation
- Seq(recordKeyField)
+ override lazy val mandatoryFields: Seq[String] = Seq.empty
override def imbueConfigs(sqlContext: SQLContext): Unit = {
super.imbueConfigs(sqlContext)
@@ -73,6 +71,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
partitionSchema: StructType,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
+ requestedColumns: Array[String],
filters: Array[Filter]): HoodieUnsafeRDD =
{
val baseFileReader = createBaseFileReader(
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index f2d2c31f67..ff6515db32 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hadoop.mapred.JobConf
-import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema,
createHFileReader, generateUnsafeProjection, getPartitionPath, projectSchema}
+import org.apache.hudi.HoodieBaseRelation.{BaseFileReader,
convertToAvroSchema, createHFileReader, generateUnsafeProjection,
getPartitionPath, projectSchema}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
@@ -204,6 +204,10 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath
}
+ /**
+ * NOTE: This fields are accessed by [[NestedSchemaPruning]] component which
is only enabled for
+ * Spark >= 3.1
+ */
lazy val (fileFormat: FileFormat, fileFormatClassName: String) =
metaClient.getTableConfig.getBaseFileFormat match {
case HoodieFileFormat.ORC => (new OrcFileFormat, "orc")
@@ -258,12 +262,11 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
*
* Check scala-doc for [[shouldExtractPartitionValuesFromPartitionPath]] for
more details
*/
- def dataSchema: StructType =
- if (shouldExtractPartitionValuesFromPartitionPath) {
- prunePartitionColumns(tableStructSchema)
- } else {
- tableStructSchema
- }
+ def dataSchema: StructType = if
(shouldExtractPartitionValuesFromPartitionPath) {
+ prunePartitionColumns(tableStructSchema)
+ } else {
+ tableStructSchema
+ }
/**
* Determines whether relation's schema could be pruned by Spark's Optimizer
@@ -346,7 +349,7 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
if (fileSplits.isEmpty) {
sparkSession.sparkContext.emptyRDD
} else {
- val rdd = composeRDD(fileSplits, partitionSchema, dataSchema,
requiredDataSchema, filters)
+ val rdd = composeRDD(fileSplits, partitionSchema, dataSchema,
requiredDataSchema, targetColumns, filters)
// NOTE: In case when partition columns have been pruned from the
required schema, we have to project
// the rows from the pruned schema back into the one expected by
the caller
@@ -369,17 +372,19 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
/**
* Composes RDD provided file splits to read from, table and partition
schemas, data filters to be applied
*
- * @param fileSplits file splits to be handled by the RDD
- * @param partitionSchema target table's partition schema
- * @param dataSchema target table's data files' schema
- * @param requiredSchema projected schema required by the reader
- * @param filters data filters to be applied
+ * @param fileSplits file splits to be handled by the RDD
+ * @param partitionSchema target table's partition schema
+ * @param dataSchema target table's data files' schema
+ * @param requiredSchema projected schema required by the reader
+ * @param requestedColumns columns requested by the query
+ * @param filters data filters to be applied
* @return instance of RDD (implementing [[HoodieUnsafeRDD]])
*/
protected def composeRDD(fileSplits: Seq[FileSplit],
partitionSchema: StructType,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
+ requestedColumns: Array[String],
filters: Array[Filter]): HoodieUnsafeRDD
/**
@@ -551,37 +556,48 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
requiredSchema: HoodieTableSchema,
filters: Seq[Filter],
options: Map[String, String],
- hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
- val hfileReader = createHFileReader(
- spark = spark,
- dataSchema = dataSchema,
- requiredSchema = requiredSchema,
- filters = filters,
- options = options,
- hadoopConf = hadoopConf
- )
-
- val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
- sparkSession = spark,
- dataSchema = dataSchema.structTypeSchema,
- partitionSchema = partitionSchema,
- requiredSchema = requiredSchema.structTypeSchema,
- filters = filters,
- options = options,
- hadoopConf = hadoopConf,
- // We're delegating to Spark to append partition values to every row
only in cases
- // when these corresponding partition-values are not persisted w/in the
data file itself
- appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
- )
+ hadoopConf: Configuration):
BaseFileReader = {
+ val tableBaseFileFormat = tableConfig.getBaseFileFormat
+
+ // NOTE: PLEASE READ CAREFULLY
+ // Lambda returned from this method is going to be invoked on the
executor, and therefore
+ // we have to eagerly initialize all of the readers even though only
one specific to the type
+ // of the file being read will be used. This is required to avoid
serialization of the whole
+ // relation (containing file-index for ex) and passing it to the
executor
+ val reader = tableBaseFileFormat match {
+ case HoodieFileFormat.PARQUET =>
+ HoodieDataSourceHelper.buildHoodieParquetReader(
+ sparkSession = spark,
+ dataSchema = dataSchema.structTypeSchema,
+ partitionSchema = partitionSchema,
+ requiredSchema = requiredSchema.structTypeSchema,
+ filters = filters,
+ options = options,
+ hadoopConf = hadoopConf,
+ // We're delegating to Spark to append partition values to every row
only in cases
+ // when these corresponding partition-values are not persisted w/in
the data file itself
+ appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
+ )
+
+ case HoodieFileFormat.HFILE =>
+ createHFileReader(
+ spark = spark,
+ dataSchema = dataSchema,
+ requiredSchema = requiredSchema,
+ filters = filters,
+ options = options,
+ hadoopConf = hadoopConf
+ )
+
+ case _ => throw new UnsupportedOperationException(s"Base file format is
not currently supported ($tableBaseFileFormat)")
+ }
partitionedFile => {
val extension = FSUtils.getFileExtension(partitionedFile.filePath)
- if (HoodieFileFormat.PARQUET.getFileExtension.equals(extension)) {
- parquetReader.apply(partitionedFile)
- } else if (HoodieFileFormat.HFILE.getFileExtension.equals(extension)) {
- hfileReader.apply(partitionedFile)
+ if (tableBaseFileFormat.getFileExtension.equals(extension)) {
+ reader.apply(partitionedFile)
} else {
- throw new UnsupportedOperationException(s"Base file format not
supported by Spark DataSource ($partitionedFile)")
+ throw new UnsupportedOperationException(s"Invalid base-file format
($extension), expected ($tableBaseFileFormat)")
}
}
}
@@ -629,6 +645,8 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
object HoodieBaseRelation extends SparkAdapterSupport {
+ type BaseFileReader = PartitionedFile => Iterator[InternalRow]
+
private def generateUnsafeProjection(from: StructType, to: StructType) =
sparkAdapter.getCatalystExpressionUtils().generateUnsafeProjection(from,
to)
@@ -676,7 +694,7 @@ object HoodieBaseRelation extends SparkAdapterSupport {
requiredSchema: HoodieTableSchema,
filters: Seq[Filter],
options: Map[String, String],
- hadoopConf: Configuration): PartitionedFile =>
Iterator[InternalRow] = {
+ hadoopConf: Configuration): BaseFileReader = {
val hadoopConfBroadcast =
spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index c9080d021e..c4c70cb414 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -23,6 +23,7 @@ import org.apache.avro.generic.{GenericRecord,
GenericRecordBuilder, IndexedReco
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
+import org.apache.hudi.HoodieBaseRelation.BaseFileReader
import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport,
collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe,
projectRowUnsafe, resolveAvroSchemaNullability}
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
@@ -55,11 +56,14 @@ import scala.util.Try
case class HoodieMergeOnReadPartition(index: Int, split:
HoodieMergeOnReadFileSplit) extends Partition
+case class HoodieMergeOnReadBaseFileReaders(fullSchemaFileReader:
BaseFileReader,
+
requiredSchemaFileReaderForMerging: BaseFileReader,
+
requiredSchemaFileReaderForNoMerging: BaseFileReader)
+
class HoodieMergeOnReadRDD(@transient sc: SparkContext,
@transient config: Configuration,
- fullSchemaFileReader: PartitionedFile =>
Iterator[InternalRow],
- requiredSchemaFileReader: PartitionedFile =>
Iterator[InternalRow],
- tableSchema: HoodieTableSchema,
+ fileReaders: HoodieMergeOnReadBaseFileReaders,
+ dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
tableState: HoodieTableState,
mergeType: String,
@@ -86,13 +90,13 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
val iter = mergeOnReadPartition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
- requiredSchemaFileReader.apply(dataFileOnlySplit.dataFile.get)
+
fileReaders.requiredSchemaFileReaderForNoMerging.apply(dataFileOnlySplit.dataFile.get)
case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
new LogFileIterator(logFileOnlySplit, getConfig)
case split if
mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
- val baseFileIterator =
requiredSchemaFileReader.apply(split.dataFile.get)
+ val baseFileIterator =
fileReaders.requiredSchemaFileReaderForNoMerging.apply(split.dataFile.get)
new SkipMergeIterator(split, baseFileIterator, getConfig)
case split if
mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
@@ -126,9 +130,9 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
// then we can avoid reading and parsing the records w/ _full_
schema, and instead only
// rely on projected one, nevertheless being able to perform merging
correctly
if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName))
- (fullSchemaFileReader(split.dataFile.get), tableSchema)
+ (fileReaders.fullSchemaFileReader(split.dataFile.get), dataSchema)
else
- (requiredSchemaFileReader(split.dataFile.get), requiredSchema)
+ (fileReaders.requiredSchemaFileReaderForMerging(split.dataFile.get),
requiredSchema)
}
override protected def getPartitions: Array[Partition] =
@@ -152,7 +156,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
protected override val requiredAvroSchema: Schema = new
Schema.Parser().parse(requiredSchema.avroSchemaStr)
protected override val requiredStructTypeSchema: StructType =
requiredSchema.structTypeSchema
- protected val logFileReaderAvroSchema: Schema = new
Schema.Parser().parse(tableSchema.avroSchemaStr)
+ protected val logFileReaderAvroSchema: Schema = new
Schema.Parser().parse(dataSchema.avroSchemaStr)
protected val recordBuilder: GenericRecordBuilder = new
GenericRecordBuilder(requiredAvroSchema)
protected var recordToLoad: InternalRow = _
@@ -167,7 +171,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
private val requiredSchemaFieldOrdinals: List[Int] =
collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema)
private var logScanner = {
- val internalSchema =
tableSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
+ val internalSchema =
dataSchema.internalSchema.getOrElse(InternalSchema.getEmptyInternalSchema)
HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split),
logFileReaderAvroSchema, tableState,
maxCompactionMemoryInBytes, config, internalSchema)
}
@@ -232,7 +236,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
override def hasNext: Boolean = {
if (baseFileIterator.hasNext) {
val curRow = baseFileIterator.next()
- recordToLoad = unsafeProjection(curRow)
+ recordToLoad = curRow
true
} else {
super[LogFileIterator].hasNext
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 38945cec9f..6fa130ac8c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -61,6 +61,7 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
partitionSchema: StructType,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
+ requestedColumns: Array[String],
filters: Array[Filter]):
HoodieMergeOnReadRDD = {
val fullSchemaParquetReader = createBaseFileReader(
spark = sqlContext.sparkSession,
@@ -81,23 +82,25 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
hadoopConf = embedInternalSchema(new Configuration(conf),
internalSchemaOpt)
)
- val requiredSchemaParquetReader = createBaseFileReader(
- spark = sqlContext.sparkSession,
- partitionSchema = partitionSchema,
- dataSchema = dataSchema,
- requiredSchema = requiredSchema,
- filters = filters ++ incrementalSpanRecordFilters,
- options = optParams,
- // NOTE: We have to fork the Hadoop Config here as Spark will be
modifying it
- // to configure Parquet reader appropriately
- hadoopConf = embedInternalSchema(new Configuration(conf),
requiredSchema.internalSchema)
- )
+ val (requiredSchemaBaseFileReaderMerging,
requiredSchemaBaseFileReaderNoMerging) =
+ createMergeOnReadBaseFileReaders(partitionSchema, dataSchema,
requiredSchema, requestedColumns, filters ++ incrementalSpanRecordFilters)
val hoodieTableState = getTableState
// TODO(HUDI-3639) implement incremental span record filtering w/in RDD to
make sure returned iterator is appropriately
// filtered, since file-reader might not be capable to
perform filtering
- new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf,
fullSchemaParquetReader, requiredSchemaParquetReader,
- dataSchema, requiredSchema, hoodieTableState, mergeType, fileSplits)
+ new HoodieMergeOnReadRDD(
+ sqlContext.sparkContext,
+ config = jobConf,
+ fileReaders = HoodieMergeOnReadBaseFileReaders(
+ fullSchemaFileReader = fullSchemaParquetReader,
+ requiredSchemaFileReaderForMerging =
requiredSchemaBaseFileReaderMerging,
+ requiredSchemaFileReaderForNoMerging =
requiredSchemaBaseFileReaderNoMerging
+ ),
+ dataSchema = dataSchema,
+ requiredSchema = requiredSchema,
+ tableState = hoodieTableState,
+ mergeType = mergeType,
+ fileSplits = fileSplits)
}
override protected def collectFileSplits(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index 9c69190f98..c6d4eafafc 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -20,14 +20,17 @@ package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
+import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, convertToAvroSchema}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.MergeOnReadSnapshotRelation.getFilePath
+import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.sources.Filter
@@ -47,9 +50,27 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
override type FileSplit = HoodieMergeOnReadFileSplit
- override lazy val mandatoryFields: Seq[String] =
+ /**
+ * NOTE: These are the fields that are required to properly fulfil
Merge-on-Read (MOR)
+ * semantic:
+ *
+ * <ol>
+ * <li>Primary key is required to make sure we're able to correlate
records from the base
+ * file with the updated records from the delta-log file</li>
+ * <li>Pre-combine key is required to properly perform the combining
(or merging) of the
+ * existing and updated records</li>
+ * </ol>
+ *
+ * However, in cases when merging is NOT performed (for ex, if
file-group only contains base
+ * files but no delta-log files, or if the query-type is equal to
[["skip_merge"]]) neither
+ * of primary-key or pre-combine-key are required to be fetched from
storage (unless requested
+ * by the query), therefore saving on throughput
+ */
+ protected lazy val mandatoryFieldsForMerging: Seq[String] =
Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
+ override lazy val mandatoryFields: Seq[String] = mandatoryFieldsForMerging
+
protected val mergeType: String =
optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
DataSourceReadOptions.REALTIME_MERGE.defaultValue)
@@ -62,8 +83,9 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
partitionSchema: StructType,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
+ requestedColumns: Array[String],
filters: Array[Filter]):
HoodieMergeOnReadRDD = {
- val fullSchemaParquetReader = createBaseFileReader(
+ val fullSchemaBaseFileReader = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
@@ -79,21 +101,23 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
hadoopConf = embedInternalSchema(new Configuration(conf),
internalSchemaOpt)
)
- val requiredSchemaParquetReader = createBaseFileReader(
- spark = sqlContext.sparkSession,
- partitionSchema = partitionSchema,
- dataSchema = dataSchema,
- requiredSchema = requiredSchema,
- filters = filters,
- options = optParams,
- // NOTE: We have to fork the Hadoop Config here as Spark will be
modifying it
- // to configure Parquet reader appropriately
- hadoopConf = embedInternalSchema(new Configuration(conf),
requiredSchema.internalSchema)
- )
+ val (requiredSchemaBaseFileReaderMerging,
requiredSchemaBaseFileReaderNoMerging) =
+ createMergeOnReadBaseFileReaders(partitionSchema, dataSchema,
requiredSchema, requestedColumns, filters)
val tableState = getTableState
- new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf,
fullSchemaParquetReader, requiredSchemaParquetReader,
- dataSchema, requiredSchema, tableState, mergeType, fileSplits)
+ new HoodieMergeOnReadRDD(
+ sqlContext.sparkContext,
+ config = jobConf,
+ fileReaders = HoodieMergeOnReadBaseFileReaders(
+ fullSchemaFileReader = fullSchemaBaseFileReader,
+ requiredSchemaFileReaderForMerging =
requiredSchemaBaseFileReaderMerging,
+ requiredSchemaFileReaderForNoMerging =
requiredSchemaBaseFileReaderNoMerging
+ ),
+ dataSchema = dataSchema,
+ requiredSchema = requiredSchema,
+ tableState = tableState,
+ mergeType = mergeType,
+ fileSplits = fileSplits)
}
protected override def collectFileSplits(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
@@ -122,6 +146,61 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
HoodieMergeOnReadFileSplit(partitionedBaseFile, logFiles)
}.toList
}
+
+ protected def createMergeOnReadBaseFileReaders(partitionSchema: StructType,
+ dataSchema: HoodieTableSchema,
+ requiredDataSchema:
HoodieTableSchema,
+ requestedColumns:
Array[String],
+ filters: Array[Filter]):
(BaseFileReader, BaseFileReader) = {
+ val requiredSchemaFileReaderMerging = createBaseFileReader(
+ spark = sqlContext.sparkSession,
+ partitionSchema = partitionSchema,
+ dataSchema = dataSchema,
+ requiredSchema = requiredDataSchema,
+ filters = filters,
+ options = optParams,
+ // NOTE: We have to fork the Hadoop Config here as Spark will be
modifying it
+ // to configure Parquet reader appropriately
+ hadoopConf = embedInternalSchema(new Configuration(conf),
requiredDataSchema.internalSchema)
+ )
+
+ // Check whether fields required for merging were also requested to be
fetched
+ // by the query:
+ // - In case they were, there's no optimization we could apply here (we
will have
+ // to fetch such fields)
+ // - In case they were not, we will provide 2 separate file-readers
+ // a) One which would be applied to file-groups w/ delta-logs
(merging)
+ // b) One which would be applied to file-groups w/ no delta-logs or
+ // in case query-mode is skipping merging
+ val requiredColumns =
mandatoryFieldsForMerging.map(HoodieAvroUtils.getRootLevelFieldName)
+ if (requiredColumns.forall(requestedColumns.contains)) {
+ (requiredSchemaFileReaderMerging, requiredSchemaFileReaderMerging)
+ } else {
+ val prunedRequiredSchema = {
+ val superfluousColumnNames =
requiredColumns.filterNot(requestedColumns.contains)
+ val prunedStructSchema =
+ StructType(requiredDataSchema.structTypeSchema.fields
+ .filterNot(f => superfluousColumnNames.contains(f.name)))
+
+ HoodieTableSchema(prunedStructSchema,
convertToAvroSchema(prunedStructSchema).toString)
+ }
+
+ val requiredSchemaFileReaderNoMerging = createBaseFileReader(
+ spark = sqlContext.sparkSession,
+ partitionSchema = partitionSchema,
+ dataSchema = dataSchema,
+ requiredSchema = prunedRequiredSchema,
+ filters = filters,
+ options = optParams,
+ // NOTE: We have to fork the Hadoop Config here as Spark will be
modifying it
+ // to configure Parquet reader appropriately
+ hadoopConf = embedInternalSchema(new Configuration(conf),
requiredDataSchema.internalSchema)
+ )
+
+ (requiredSchemaFileReaderMerging, requiredSchemaFileReaderNoMerging)
+ }
+ }
+
}
object MergeOnReadSnapshotRelation {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieRelations.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieRelations.scala
index a8db68edd1..ce9a5d571e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieRelations.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieRelations.scala
@@ -51,6 +51,4 @@ class TestHoodieRelations {
requiredStructSchema.fields.toSeq
)
}
-
-
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index 4366e8c95f..b24a341d4a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -54,7 +54,7 @@ class TestParquetColumnProjection extends
SparkClientFunctionalTestHarness with
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key ->
classOf[NonpartitionedKeyGenerator].getName
)
- @Disabled("HUDI-3896")
+ @Disabled("Currently disabled b/c of the fallback to HadoopFsRelation")
@Test
def testBaseFileOnlyViewRelation(): Unit = {
val tablePath = s"$basePath/cow"
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 54dd45f3f5..57c826af92 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -24,6 +24,7 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieDuplicateKeyException
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.internal.SQLConf
import java.io.File