xushiyan commented on a change in pull request #4877: URL: https://github.com/apache/hudi/pull/4877#discussion_r824767545
########## File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala ########## @@ -65,20 +65,6 @@ object HoodieDataSourceHelper extends PredicateHelper { } } - /** - * Extract the required schema from [[InternalRow]] - */ - def extractRequiredSchema( Review comment: this not used? ########## File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala ########## @@ -130,22 +158,110 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, * NOTE: DO NOT OVERRIDE THIS METHOD */ override final def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + // NOTE: In case list of requested columns doesn't contain the Primary Key one, we + // have to add it explicitly so that + // - Merging could be performed correctly + // - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]], + // Spark still fetches all the rows to execute the query correctly + // + // It's okay to return columns that have not been requested by the caller, as those nevertheless will be + // filtered out upstream + val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns) + + val (requiredAvroSchema, requiredStructSchema) = + HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns) + + val filterExpressions = convertToExpressions(filters) + val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate) + + val fileSplits = collectFileSplits(partitionFilters, dataFilters) + + val partitionSchema = StructType(Nil) + val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchema.toString) + val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString) + // Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]] // Please check [[needConversion]] scala-doc for more details - doBuildScan(requiredColumns, filters).asInstanceOf[RDD[Row]] + composeRDD(fileSplits, partitionSchema, tableSchema, requiredSchema, filters).asInstanceOf[RDD[Row]] } - protected def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] + // TODO scala-doc + protected def composeRDD(fileSplits: Seq[FileSplit], + partitionSchema: StructType, + tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + filters: Array[Filter]): HoodieUnsafeRDD + + // TODO scala-doc + protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSplit] + + protected def listLatestBaseFiles(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = { + if (globPaths.isEmpty) { + val partitionDirs = fileIndex.listFiles(partitionFilters, dataFilters) + partitionDirs.map(pd => (getPartitionPath(pd.files.head), pd.files)).toMap + } else { + val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sparkSession, globPaths) + val partitionDirs = inMemoryFileIndex.listFiles(partitionFilters, dataFilters) + + val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray) + val latestBaseFiles = fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus) + + latestBaseFiles.groupBy(getPartitionPath) + } + } + + protected def convertToExpressions(filters: Array[Filter]): Array[Expression] = { + val catalystExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema) + + val failedExprs = catalystExpressions.zipWithIndex.filter { case (opt, _) => opt.isEmpty } + if (failedExprs.nonEmpty) { + val failedFilters = failedExprs.map(p => filters(p._2)) + logWarning(s"Failed to convert Filters into Catalyst expressions (${failedFilters.map(_.toString)})") + } + + catalystExpressions.filter(_.isDefined).map(_.get).toArray + } + + /** + * Checks whether given expression only references partition columns + * (and involves no sub-query) + */ + protected def isPartitionPredicate(condition: Expression): Boolean = { + // Validates that the provided names both resolve to the same entity + val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver + + condition.references.forall { r => partitionColumns.exists(resolvedNameEquals(r.name, _)) } && + !SubqueryExpression.hasSubquery(condition) + } protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = { val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col)) requestedColumns ++ missing } + + private def getPrecombineFieldProperty: Option[String] = + Option(tableConfig.getPreCombineField) + .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) match { + // NOTE: This is required to compensate for cases when empty string is used to stub + // property value to avoid it being set with the default value + // TODO(HUDI-3456) cleanup + case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f) + case _ => None + } + + private def imbueConfigs(sqlContext: SQLContext): Unit = { + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true") + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true") + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "true") + } Review comment: this one was false ########## File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala ########## @@ -130,22 +158,110 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, * NOTE: DO NOT OVERRIDE THIS METHOD */ override final def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + // NOTE: In case list of requested columns doesn't contain the Primary Key one, we + // have to add it explicitly so that + // - Merging could be performed correctly + // - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]], + // Spark still fetches all the rows to execute the query correctly + // + // It's okay to return columns that have not been requested by the caller, as those nevertheless will be + // filtered out upstream + val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns) + + val (requiredAvroSchema, requiredStructSchema) = + HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns) + + val filterExpressions = convertToExpressions(filters) + val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate) + + val fileSplits = collectFileSplits(partitionFilters, dataFilters) + + val partitionSchema = StructType(Nil) + val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchema.toString) + val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString) + // Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]] // Please check [[needConversion]] scala-doc for more details - doBuildScan(requiredColumns, filters).asInstanceOf[RDD[Row]] + composeRDD(fileSplits, partitionSchema, tableSchema, requiredSchema, filters).asInstanceOf[RDD[Row]] } - protected def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] + // TODO scala-doc + protected def composeRDD(fileSplits: Seq[FileSplit], + partitionSchema: StructType, + tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + filters: Array[Filter]): HoodieUnsafeRDD + + // TODO scala-doc Review comment: add now? ########## File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala ########## @@ -130,22 +158,110 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, * NOTE: DO NOT OVERRIDE THIS METHOD */ override final def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + // NOTE: In case list of requested columns doesn't contain the Primary Key one, we + // have to add it explicitly so that + // - Merging could be performed correctly + // - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]], + // Spark still fetches all the rows to execute the query correctly + // + // It's okay to return columns that have not been requested by the caller, as those nevertheless will be + // filtered out upstream + val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns) + + val (requiredAvroSchema, requiredStructSchema) = + HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns) + + val filterExpressions = convertToExpressions(filters) + val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate) + + val fileSplits = collectFileSplits(partitionFilters, dataFilters) + + val partitionSchema = StructType(Nil) + val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchema.toString) + val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString) + // Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]] // Please check [[needConversion]] scala-doc for more details - doBuildScan(requiredColumns, filters).asInstanceOf[RDD[Row]] + composeRDD(fileSplits, partitionSchema, tableSchema, requiredSchema, filters).asInstanceOf[RDD[Row]] } - protected def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] + // TODO scala-doc + protected def composeRDD(fileSplits: Seq[FileSplit], + partitionSchema: StructType, + tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + filters: Array[Filter]): HoodieUnsafeRDD + + // TODO scala-doc + protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSplit] + + protected def listLatestBaseFiles(globPaths: Seq[Path], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Map[Path, Seq[FileStatus]] = { + if (globPaths.isEmpty) { + val partitionDirs = fileIndex.listFiles(partitionFilters, dataFilters) + partitionDirs.map(pd => (getPartitionPath(pd.files.head), pd.files)).toMap + } else { + val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sparkSession, globPaths) + val partitionDirs = inMemoryFileIndex.listFiles(partitionFilters, dataFilters) + + val fsView = new HoodieTableFileSystemView(metaClient, timeline, partitionDirs.flatMap(_.files).toArray) + val latestBaseFiles = fsView.getLatestBaseFiles.iterator().asScala.toList.map(_.getFileStatus) + + latestBaseFiles.groupBy(getPartitionPath) + } + } + + protected def convertToExpressions(filters: Array[Filter]): Array[Expression] = { + val catalystExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema) + + val failedExprs = catalystExpressions.zipWithIndex.filter { case (opt, _) => opt.isEmpty } + if (failedExprs.nonEmpty) { + val failedFilters = failedExprs.map(p => filters(p._2)) + logWarning(s"Failed to convert Filters into Catalyst expressions (${failedFilters.map(_.toString)})") + } + + catalystExpressions.filter(_.isDefined).map(_.get).toArray + } + + /** + * Checks whether given expression only references partition columns + * (and involves no sub-query) + */ + protected def isPartitionPredicate(condition: Expression): Boolean = { + // Validates that the provided names both resolve to the same entity + val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver + + condition.references.forall { r => partitionColumns.exists(resolvedNameEquals(r.name, _)) } && + !SubqueryExpression.hasSubquery(condition) + } protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = { val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col)) requestedColumns ++ missing } + + private def getPrecombineFieldProperty: Option[String] = + Option(tableConfig.getPreCombineField) + .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) match { + // NOTE: This is required to compensate for cases when empty string is used to stub + // property value to avoid it being set with the default value + // TODO(HUDI-3456) cleanup + case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f) + case _ => None + } + + private def imbueConfigs(sqlContext: SQLContext): Unit = { + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true") + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true") + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "true") + } } object HoodieBaseRelation { - def isMetadataTable(metaClient: HoodieTableMetaClient) = + def getPartitionPath(fileStatus: FileStatus): Path = Review comment: better call `getParentPath` ? `getParent` does not equal to partition path right, depends on input ########## File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala ########## @@ -105,13 +104,15 @@ class SparkHoodieTableFileIndex(spark: SparkSession, * Fetch list of latest base files w/ corresponding log files, after performing * partition pruning * + * TODO unify w/ HoodieFileIndex#listFiles + * * @param partitionFilters partition column filters * @return mapping from string partition paths to its base/log files */ def listFileSlices(partitionFilters: Seq[Expression]): Map[String, Seq[FileSlice]] = { // Prune the partition path by the partition filters val prunedPartitions = HoodieCommonUtils.prunePartition(partitionSchema, - cachedAllInputFileSlices.asScala.keys.toSeq, partitionFilters) Review comment: 👍 ########## File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala ########## @@ -130,22 +158,110 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, * NOTE: DO NOT OVERRIDE THIS METHOD */ override final def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + // NOTE: In case list of requested columns doesn't contain the Primary Key one, we + // have to add it explicitly so that + // - Merging could be performed correctly + // - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]], + // Spark still fetches all the rows to execute the query correctly + // + // It's okay to return columns that have not been requested by the caller, as those nevertheless will be + // filtered out upstream + val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns) + + val (requiredAvroSchema, requiredStructSchema) = + HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns) + + val filterExpressions = convertToExpressions(filters) + val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate) + + val fileSplits = collectFileSplits(partitionFilters, dataFilters) + + val partitionSchema = StructType(Nil) + val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchema.toString) + val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString) + // Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]] // Please check [[needConversion]] scala-doc for more details - doBuildScan(requiredColumns, filters).asInstanceOf[RDD[Row]] + composeRDD(fileSplits, partitionSchema, tableSchema, requiredSchema, filters).asInstanceOf[RDD[Row]] } - protected def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] + // TODO scala-doc Review comment: add now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org