This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch release-0.11.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit bdc0f9336a1fcd86d8fd216124ae4f7f7a6ea64b Author: Alexey Kudinkin <ale...@infinilake.com> AuthorDate: Mon Apr 18 13:06:52 2022 -0700 [HUDI-3895] Fixing file-partitioning seq for base-file only views to make sure we bucket the files efficiently (#5337) --- .../org/apache/hudi/BaseFileOnlyRelation.scala | 27 ++++++++++++---------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index 525292da6d..f46b31b036 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -84,21 +84,24 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = { val partitions = listLatestBaseFiles(globPaths, partitionFilters, dataFilters) - val fileSplits = partitions.values.toSeq.flatMap { files => - files.flatMap { file => - // TODO move to adapter - // TODO fix, currently assuming parquet as underlying format - HoodieDataSourceHelper.splitFiles( - sparkSession = sparkSession, - file = file, - // TODO clarify why this is required - partitionValues = getPartitionColumnsAsInternalRow(file) - ) + val fileSplits = partitions.values.toSeq + .flatMap { files => + files.flatMap { file => + // TODO fix, currently assuming parquet as underlying format + HoodieDataSourceHelper.splitFiles( + sparkSession = sparkSession, + file = file, + partitionValues = getPartitionColumnsAsInternalRow(file) + ) + } } - } + // NOTE: It's important to order the splits in the reverse order of their + // size so that we can subsequently bucket them in an efficient manner + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes - sparkAdapter.getFilePartitions(sparkSession, fileSplits, maxSplitBytes).map(HoodieBaseFileSplit.apply) + sparkAdapter.getFilePartitions(sparkSession, fileSplits, maxSplitBytes) + .map(HoodieBaseFileSplit.apply) } }