This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit bdc0f9336a1fcd86d8fd216124ae4f7f7a6ea64b
Author: Alexey Kudinkin <ale...@infinilake.com>
AuthorDate: Mon Apr 18 13:06:52 2022 -0700

    [HUDI-3895] Fixing file-partitioning seq for base-file only views to make 
sure we bucket the files efficiently (#5337)
---
 .../org/apache/hudi/BaseFileOnlyRelation.scala     | 27 ++++++++++++----------
 1 file changed, 15 insertions(+), 12 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
index 525292da6d..f46b31b036 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
@@ -84,21 +84,24 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
 
   protected def collectFileSplits(partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = {
     val partitions = listLatestBaseFiles(globPaths, partitionFilters, 
dataFilters)
-    val fileSplits = partitions.values.toSeq.flatMap { files =>
-      files.flatMap { file =>
-        // TODO move to adapter
-        // TODO fix, currently assuming parquet as underlying format
-        HoodieDataSourceHelper.splitFiles(
-          sparkSession = sparkSession,
-          file = file,
-          // TODO clarify why this is required
-          partitionValues = getPartitionColumnsAsInternalRow(file)
-        )
+    val fileSplits = partitions.values.toSeq
+      .flatMap { files =>
+        files.flatMap { file =>
+          // TODO fix, currently assuming parquet as underlying format
+          HoodieDataSourceHelper.splitFiles(
+            sparkSession = sparkSession,
+            file = file,
+            partitionValues = getPartitionColumnsAsInternalRow(file)
+          )
+        }
       }
-    }
+      // NOTE: It's important to order the splits in the reverse order of their
+      //       size so that we can subsequently bucket them in an efficient 
manner
+      .sortBy(_.length)(implicitly[Ordering[Long]].reverse)
 
     val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
 
-    sparkAdapter.getFilePartitions(sparkSession, fileSplits, 
maxSplitBytes).map(HoodieBaseFileSplit.apply)
+    sparkAdapter.getFilePartitions(sparkSession, fileSplits, maxSplitBytes)
+      .map(HoodieBaseFileSplit.apply)
   }
 }

Reply via email to