This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch rc3-patched-for-test
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/rc3-patched-for-test by this 
push:
     new 63d5f44bc2 Improve readability
63d5f44bc2 is described below

commit 63d5f44bc20a66412c36c6068d46887fdc53e1a3
Author: Y Ethan Guo <ethan.guoyi...@gmail.com>
AuthorDate: Thu Apr 21 12:59:58 2022 -0700

    Improve readability
---
 .../main/scala/org/apache/hudi/BaseFileOnlyRelation.scala    |  6 +++---
 .../src/main/scala/org/apache/hudi/HoodieBaseRelation.scala  | 12 +++++++-----
 .../org/apache/hudi/MergeOnReadIncrementalRelation.scala     |  2 +-
 .../scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala  |  2 +-
 .../apache/hudi/functional/TestParquetColumnProjection.scala |  4 ++--
 5 files changed, 14 insertions(+), 12 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
index 3c667d2b42..c57f46a7b6 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
@@ -26,7 +26,7 @@ import org.apache.hudi.hadoop.HoodieROTablePathFilter
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.datasources._
-import 
org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, 
ParquetFileFormat}
+import 
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
 import org.apache.spark.sql.hive.orc.OrcFileFormat
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.StructType
@@ -54,8 +54,8 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
 
   override type FileSplit = HoodieBaseFileSplit
 
-  override lazy val mandatoryColumns: Seq[String] =
-    // TODO reconcile, record's key shouldn't be mandatory for base-file only 
relation
+  override lazy val mandatoryFields: Seq[String] =
+  // TODO reconcile, record's key shouldn't be mandatory for base-file only 
relation
     Seq(recordKeyField)
 
   override def imbueConfigs(sqlContext: SQLContext): Unit = {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index aac57e1bbb..4b7177f4d6 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -198,7 +198,10 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
    *
    * @VisibleInTests
    */
-  val mandatoryColumns: Seq[String]
+  val mandatoryFields: Seq[String]
+
+  protected def mandatoryRootFields: Seq[String] =
+    mandatoryFields.map(col => HoodieAvroUtils.getRootLevelFieldName(col))
 
   protected def timeline: HoodieTimeline =
   // NOTE: We're including compaction here since it's not considering a 
"commit" operation
@@ -245,7 +248,7 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
     //
     // (!!!) IT'S CRITICAL TO AVOID REORDERING OF THE REQUESTED COLUMNS AS 
THIS WILL BREAK THE UPSTREAM
     //       PROJECTION
-    val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
+    val fetchedColumns: Array[String] = 
appendMandatoryRootFields(requiredColumns)
 
     val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
       HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns, 
internalSchema)
@@ -361,12 +364,11 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
       !SubqueryExpression.hasSubquery(condition)
   }
 
-  protected final def appendMandatoryColumns(requestedColumns: Array[String]): 
Array[String] = {
+  protected final def appendMandatoryRootFields(requestedColumns: 
Array[String]): Array[String] = {
     // For a nested field in mandatory columns, we should first get the 
root-level field, and then
     // check for any missing column, as the requestedColumns should only 
contain root-level fields
     // We should only append root-level field as well
-    val missing = mandatoryColumns.map(col => 
HoodieAvroUtils.getRootLevelFieldName(col))
-      .filter(rootField => !requestedColumns.contains(rootField))
+    val missing = mandatoryRootFields.filter(rootField => 
!requestedColumns.contains(rootField))
     requestedColumns ++ missing
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 6aa7007851..806a5e371d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -153,7 +153,7 @@ trait HoodieIncrementalRelationTrait extends 
HoodieBaseRelation {
     Seq(isNotNullFilter, largerThanFilter, lessThanFilter)
   }
 
-  override lazy val mandatoryColumns: Seq[String] = {
+  override lazy val mandatoryFields: Seq[String] = {
     // NOTE: This columns are required for Incremental flow to be able to 
handle the rows properly, even in
     //       cases when no columns are requested to be fetched (for ex, when 
using {@code count()} API)
     Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index a88eb63036..75bc96624e 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -47,7 +47,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
 
   override type FileSplit = HoodieMergeOnReadFileSplit
 
-  override lazy val mandatoryColumns: Seq[String] =
+  override lazy val mandatoryFields: Seq[String] =
     Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
 
   protected val mergeType: String = 
optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index f670450c3e..945d26be3f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -19,7 +19,7 @@ package org.apache.hudi.functional
 
 import org.apache.avro.Schema
 import org.apache.hudi.common.config.HoodieMetadataConfig
-import org.apache.hudi.common.model.{HoodieRecord, 
OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload}
+import org.apache.hudi.common.model.{HoodieRecord, 
OverwriteNonDefaultsWithLatestAvroPayload}
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.testutils.{HadoopMapRedUtils, 
HoodieTestDataGenerator}
 import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig}
@@ -332,7 +332,7 @@ class TestParquetColumnProjection extends 
SparkClientFunctionalTestHarness with
         logWarning(s"Not matching bytes read ($bytesRead)")
       }
 
-      val readColumns = targetColumns ++ relation.mandatoryColumns
+      val readColumns = targetColumns ++ relation.mandatoryFields
       val (_, projectedStructType, _) = 
HoodieSparkUtils.getRequiredSchema(tableState.schema, readColumns)
 
       val row: InternalRow = rows.take(1).head

Reply via email to