xushiyan commented on a change in pull request #4877:
URL: https://github.com/apache/hudi/pull/4877#discussion_r824767545



##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
##########
@@ -65,20 +65,6 @@ object HoodieDataSourceHelper extends PredicateHelper {
     }
   }
 
-  /**
-   * Extract the required schema from [[InternalRow]]
-   */
-  def extractRequiredSchema(

Review comment:
       this not used?

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
##########
@@ -130,22 +158,110 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
    * NOTE: DO NOT OVERRIDE THIS METHOD
    */
   override final def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
+    // NOTE: In case list of requested columns doesn't contain the Primary Key 
one, we
+    //       have to add it explicitly so that
+    //          - Merging could be performed correctly
+    //          - In case 0 columns are to be fetched (for ex, when doing 
{@code count()} on Spark's [[Dataset]],
+    //          Spark still fetches all the rows to execute the query correctly
+    //
+    //       It's okay to return columns that have not been requested by the 
caller, as those nevertheless will be
+    //       filtered out upstream
+    val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
+
+    val (requiredAvroSchema, requiredStructSchema) =
+      HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)
+
+    val filterExpressions = convertToExpressions(filters)
+    val (partitionFilters, dataFilters) = 
filterExpressions.partition(isPartitionPredicate)
+
+    val fileSplits = collectFileSplits(partitionFilters, dataFilters)
+
+    val partitionSchema = StructType(Nil)
+    val tableSchema = HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString)
+    val requiredSchema = HoodieTableSchema(requiredStructSchema, 
requiredAvroSchema.toString)
+
     // Here we rely on a type erasure, to workaround inherited API restriction 
and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
     // Please check [[needConversion]] scala-doc for more details
-    doBuildScan(requiredColumns, filters).asInstanceOf[RDD[Row]]
+    composeRDD(fileSplits, partitionSchema, tableSchema, requiredSchema, 
filters).asInstanceOf[RDD[Row]]
   }
 
-  protected def doBuildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[InternalRow]
+  // TODO scala-doc
+  protected def composeRDD(fileSplits: Seq[FileSplit],
+                           partitionSchema: StructType,
+                           tableSchema: HoodieTableSchema,
+                           requiredSchema: HoodieTableSchema,
+                           filters: Array[Filter]): HoodieUnsafeRDD
+
+  // TODO scala-doc
+  protected def collectFileSplits(partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression]): Seq[FileSplit]
+
+  protected def listLatestBaseFiles(globPaths: Seq[Path], partitionFilters: 
Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = {
+    if (globPaths.isEmpty) {
+      val partitionDirs = fileIndex.listFiles(partitionFilters, dataFilters)
+      partitionDirs.map(pd => (getPartitionPath(pd.files.head), 
pd.files)).toMap
+    } else {
+      val inMemoryFileIndex = 
HoodieSparkUtils.createInMemoryFileIndex(sparkSession, globPaths)
+      val partitionDirs = inMemoryFileIndex.listFiles(partitionFilters, 
dataFilters)
+
+      val fsView = new HoodieTableFileSystemView(metaClient, timeline, 
partitionDirs.flatMap(_.files).toArray)
+      val latestBaseFiles = 
fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus)
+
+      latestBaseFiles.groupBy(getPartitionPath)
+    }
+  }
+
+  protected def convertToExpressions(filters: Array[Filter]): 
Array[Expression] = {
+    val catalystExpressions = 
HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema)
+
+    val failedExprs = catalystExpressions.zipWithIndex.filter { case (opt, _) 
=> opt.isEmpty }
+    if (failedExprs.nonEmpty) {
+      val failedFilters = failedExprs.map(p => filters(p._2))
+      logWarning(s"Failed to convert Filters into Catalyst expressions 
(${failedFilters.map(_.toString)})")
+    }
+
+    catalystExpressions.filter(_.isDefined).map(_.get).toArray
+  }
+
+  /**
+   * Checks whether given expression only references partition columns
+   * (and involves no sub-query)
+   */
+  protected def isPartitionPredicate(condition: Expression): Boolean = {
+    // Validates that the provided names both resolve to the same entity
+    val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver
+
+    condition.references.forall { r => 
partitionColumns.exists(resolvedNameEquals(r.name, _)) } &&
+      !SubqueryExpression.hasSubquery(condition)
+  }
 
   protected final def appendMandatoryColumns(requestedColumns: Array[String]): 
Array[String] = {
     val missing = mandatoryColumns.filter(col => 
!requestedColumns.contains(col))
     requestedColumns ++ missing
   }
+
+  private def getPrecombineFieldProperty: Option[String] =
+    Option(tableConfig.getPreCombineField)
+      .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) 
match {
+      // NOTE: This is required to compensate for cases when empty string is 
used to stub
+      //       property value to avoid it being set with the default value
+      // TODO(HUDI-3456) cleanup
+      case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f)
+      case _ => None
+    }
+
+  private def imbueConfigs(sqlContext: SQLContext): Unit = {
+    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown",
 "true")
+    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled",
 "true")
+    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
 "true")
+  }

Review comment:
       this one was false

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
##########
@@ -130,22 +158,110 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
    * NOTE: DO NOT OVERRIDE THIS METHOD
    */
   override final def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
+    // NOTE: In case list of requested columns doesn't contain the Primary Key 
one, we
+    //       have to add it explicitly so that
+    //          - Merging could be performed correctly
+    //          - In case 0 columns are to be fetched (for ex, when doing 
{@code count()} on Spark's [[Dataset]],
+    //          Spark still fetches all the rows to execute the query correctly
+    //
+    //       It's okay to return columns that have not been requested by the 
caller, as those nevertheless will be
+    //       filtered out upstream
+    val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
+
+    val (requiredAvroSchema, requiredStructSchema) =
+      HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)
+
+    val filterExpressions = convertToExpressions(filters)
+    val (partitionFilters, dataFilters) = 
filterExpressions.partition(isPartitionPredicate)
+
+    val fileSplits = collectFileSplits(partitionFilters, dataFilters)
+
+    val partitionSchema = StructType(Nil)
+    val tableSchema = HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString)
+    val requiredSchema = HoodieTableSchema(requiredStructSchema, 
requiredAvroSchema.toString)
+
     // Here we rely on a type erasure, to workaround inherited API restriction 
and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
     // Please check [[needConversion]] scala-doc for more details
-    doBuildScan(requiredColumns, filters).asInstanceOf[RDD[Row]]
+    composeRDD(fileSplits, partitionSchema, tableSchema, requiredSchema, 
filters).asInstanceOf[RDD[Row]]
   }
 
-  protected def doBuildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[InternalRow]
+  // TODO scala-doc
+  protected def composeRDD(fileSplits: Seq[FileSplit],
+                           partitionSchema: StructType,
+                           tableSchema: HoodieTableSchema,
+                           requiredSchema: HoodieTableSchema,
+                           filters: Array[Filter]): HoodieUnsafeRDD
+
+  // TODO scala-doc

Review comment:
       add now?

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
##########
@@ -130,22 +158,110 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
    * NOTE: DO NOT OVERRIDE THIS METHOD
    */
   override final def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
+    // NOTE: In case list of requested columns doesn't contain the Primary Key 
one, we
+    //       have to add it explicitly so that
+    //          - Merging could be performed correctly
+    //          - In case 0 columns are to be fetched (for ex, when doing 
{@code count()} on Spark's [[Dataset]],
+    //          Spark still fetches all the rows to execute the query correctly
+    //
+    //       It's okay to return columns that have not been requested by the 
caller, as those nevertheless will be
+    //       filtered out upstream
+    val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
+
+    val (requiredAvroSchema, requiredStructSchema) =
+      HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)
+
+    val filterExpressions = convertToExpressions(filters)
+    val (partitionFilters, dataFilters) = 
filterExpressions.partition(isPartitionPredicate)
+
+    val fileSplits = collectFileSplits(partitionFilters, dataFilters)
+
+    val partitionSchema = StructType(Nil)
+    val tableSchema = HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString)
+    val requiredSchema = HoodieTableSchema(requiredStructSchema, 
requiredAvroSchema.toString)
+
     // Here we rely on a type erasure, to workaround inherited API restriction 
and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
     // Please check [[needConversion]] scala-doc for more details
-    doBuildScan(requiredColumns, filters).asInstanceOf[RDD[Row]]
+    composeRDD(fileSplits, partitionSchema, tableSchema, requiredSchema, 
filters).asInstanceOf[RDD[Row]]
   }
 
-  protected def doBuildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[InternalRow]
+  // TODO scala-doc
+  protected def composeRDD(fileSplits: Seq[FileSplit],
+                           partitionSchema: StructType,
+                           tableSchema: HoodieTableSchema,
+                           requiredSchema: HoodieTableSchema,
+                           filters: Array[Filter]): HoodieUnsafeRDD
+
+  // TODO scala-doc
+  protected def collectFileSplits(partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression]): Seq[FileSplit]
+
+  protected def listLatestBaseFiles(globPaths: Seq[Path], partitionFilters: 
Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = {
+    if (globPaths.isEmpty) {
+      val partitionDirs = fileIndex.listFiles(partitionFilters, dataFilters)
+      partitionDirs.map(pd => (getPartitionPath(pd.files.head), 
pd.files)).toMap
+    } else {
+      val inMemoryFileIndex = 
HoodieSparkUtils.createInMemoryFileIndex(sparkSession, globPaths)
+      val partitionDirs = inMemoryFileIndex.listFiles(partitionFilters, 
dataFilters)
+
+      val fsView = new HoodieTableFileSystemView(metaClient, timeline, 
partitionDirs.flatMap(_.files).toArray)
+      val latestBaseFiles = 
fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus)
+
+      latestBaseFiles.groupBy(getPartitionPath)
+    }
+  }
+
+  protected def convertToExpressions(filters: Array[Filter]): 
Array[Expression] = {
+    val catalystExpressions = 
HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema)
+
+    val failedExprs = catalystExpressions.zipWithIndex.filter { case (opt, _) 
=> opt.isEmpty }
+    if (failedExprs.nonEmpty) {
+      val failedFilters = failedExprs.map(p => filters(p._2))
+      logWarning(s"Failed to convert Filters into Catalyst expressions 
(${failedFilters.map(_.toString)})")
+    }
+
+    catalystExpressions.filter(_.isDefined).map(_.get).toArray
+  }
+
+  /**
+   * Checks whether given expression only references partition columns
+   * (and involves no sub-query)
+   */
+  protected def isPartitionPredicate(condition: Expression): Boolean = {
+    // Validates that the provided names both resolve to the same entity
+    val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver
+
+    condition.references.forall { r => 
partitionColumns.exists(resolvedNameEquals(r.name, _)) } &&
+      !SubqueryExpression.hasSubquery(condition)
+  }
 
   protected final def appendMandatoryColumns(requestedColumns: Array[String]): 
Array[String] = {
     val missing = mandatoryColumns.filter(col => 
!requestedColumns.contains(col))
     requestedColumns ++ missing
   }
+
+  private def getPrecombineFieldProperty: Option[String] =
+    Option(tableConfig.getPreCombineField)
+      .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) 
match {
+      // NOTE: This is required to compensate for cases when empty string is 
used to stub
+      //       property value to avoid it being set with the default value
+      // TODO(HUDI-3456) cleanup
+      case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f)
+      case _ => None
+    }
+
+  private def imbueConfigs(sqlContext: SQLContext): Unit = {
+    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown",
 "true")
+    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled",
 "true")
+    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
 "true")
+  }
 }
 
 object HoodieBaseRelation {
 
-  def isMetadataTable(metaClient: HoodieTableMetaClient) =
+  def getPartitionPath(fileStatus: FileStatus): Path =

Review comment:
       better call `getParentPath` ? `getParent` does not equal to partition 
path right, depends on input

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
##########
@@ -105,13 +104,15 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
    * Fetch list of latest base files w/ corresponding log files, after 
performing
    * partition pruning
    *
+   * TODO unify w/ HoodieFileIndex#listFiles
+   *
    * @param partitionFilters partition column filters
    * @return mapping from string partition paths to its base/log files
    */
   def listFileSlices(partitionFilters: Seq[Expression]): Map[String, 
Seq[FileSlice]] = {
     // Prune the partition path by the partition filters
     val prunedPartitions = HoodieCommonUtils.prunePartition(partitionSchema,
-      cachedAllInputFileSlices.asScala.keys.toSeq, partitionFilters)

Review comment:
       👍 

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
##########
@@ -130,22 +158,110 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
    * NOTE: DO NOT OVERRIDE THIS METHOD
    */
   override final def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
+    // NOTE: In case list of requested columns doesn't contain the Primary Key 
one, we
+    //       have to add it explicitly so that
+    //          - Merging could be performed correctly
+    //          - In case 0 columns are to be fetched (for ex, when doing 
{@code count()} on Spark's [[Dataset]],
+    //          Spark still fetches all the rows to execute the query correctly
+    //
+    //       It's okay to return columns that have not been requested by the 
caller, as those nevertheless will be
+    //       filtered out upstream
+    val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
+
+    val (requiredAvroSchema, requiredStructSchema) =
+      HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)
+
+    val filterExpressions = convertToExpressions(filters)
+    val (partitionFilters, dataFilters) = 
filterExpressions.partition(isPartitionPredicate)
+
+    val fileSplits = collectFileSplits(partitionFilters, dataFilters)
+
+    val partitionSchema = StructType(Nil)
+    val tableSchema = HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString)
+    val requiredSchema = HoodieTableSchema(requiredStructSchema, 
requiredAvroSchema.toString)
+
     // Here we rely on a type erasure, to workaround inherited API restriction 
and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
     // Please check [[needConversion]] scala-doc for more details
-    doBuildScan(requiredColumns, filters).asInstanceOf[RDD[Row]]
+    composeRDD(fileSplits, partitionSchema, tableSchema, requiredSchema, 
filters).asInstanceOf[RDD[Row]]
   }
 
-  protected def doBuildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[InternalRow]
+  // TODO scala-doc

Review comment:
       add now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to