This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch rc3-patched-for-test in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/rc3-patched-for-test by this push: new 63d5f44bc2 Improve readability 63d5f44bc2 is described below commit 63d5f44bc20a66412c36c6068d46887fdc53e1a3 Author: Y Ethan Guo <ethan.guoyi...@gmail.com> AuthorDate: Thu Apr 21 12:59:58 2022 -0700 Improve readability --- .../main/scala/org/apache/hudi/BaseFileOnlyRelation.scala | 6 +++--- .../src/main/scala/org/apache/hudi/HoodieBaseRelation.scala | 12 +++++++----- .../org/apache/hudi/MergeOnReadIncrementalRelation.scala | 2 +- .../scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala | 2 +- .../apache/hudi/functional/TestParquetColumnProjection.scala | 4 ++-- 5 files changed, 14 insertions(+), 12 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index 3c667d2b42..c57f46a7b6 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -26,7 +26,7 @@ import org.apache.hudi.hadoop.HoodieROTablePathFilter import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, ParquetFileFormat} +import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat import org.apache.spark.sql.hive.orc.OrcFileFormat import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.StructType @@ -54,8 +54,8 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, override type FileSplit = HoodieBaseFileSplit - override lazy val mandatoryColumns: Seq[String] = - // TODO reconcile, record's key shouldn't be mandatory for base-file only relation + override lazy val mandatoryFields: Seq[String] = + // TODO reconcile, record's key shouldn't be mandatory for base-file only relation Seq(recordKeyField) override def imbueConfigs(sqlContext: SQLContext): Unit = { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index aac57e1bbb..4b7177f4d6 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -198,7 +198,10 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, * * @VisibleInTests */ - val mandatoryColumns: Seq[String] + val mandatoryFields: Seq[String] + + protected def mandatoryRootFields: Seq[String] = + mandatoryFields.map(col => HoodieAvroUtils.getRootLevelFieldName(col)) protected def timeline: HoodieTimeline = // NOTE: We're including compaction here since it's not considering a "commit" operation @@ -245,7 +248,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, // // (!!!) IT'S CRITICAL TO AVOID REORDERING OF THE REQUESTED COLUMNS AS THIS WILL BREAK THE UPSTREAM // PROJECTION - val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns) + val fetchedColumns: Array[String] = appendMandatoryRootFields(requiredColumns) val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) = HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns, internalSchema) @@ -361,12 +364,11 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, !SubqueryExpression.hasSubquery(condition) } - protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = { + protected final def appendMandatoryRootFields(requestedColumns: Array[String]): Array[String] = { // For a nested field in mandatory columns, we should first get the root-level field, and then // check for any missing column, as the requestedColumns should only contain root-level fields // We should only append root-level field as well - val missing = mandatoryColumns.map(col => HoodieAvroUtils.getRootLevelFieldName(col)) - .filter(rootField => !requestedColumns.contains(rootField)) + val missing = mandatoryRootFields.filter(rootField => !requestedColumns.contains(rootField)) requestedColumns ++ missing } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 6aa7007851..806a5e371d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -153,7 +153,7 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation { Seq(isNotNullFilter, largerThanFilter, lessThanFilter) } - override lazy val mandatoryColumns: Seq[String] = { + override lazy val mandatoryFields: Seq[String] = { // NOTE: This columns are required for Incremental flow to be able to handle the rows properly, even in // cases when no columns are requested to be fetched (for ex, when using {@code count()} API) Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++ diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index a88eb63036..75bc96624e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -47,7 +47,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, override type FileSplit = HoodieMergeOnReadFileSplit - override lazy val mandatoryColumns: Seq[String] = + override lazy val mandatoryFields: Seq[String] = Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq()) protected val mergeType: String = optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala index f670450c3e..945d26be3f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala @@ -19,7 +19,7 @@ package org.apache.hudi.functional import org.apache.avro.Schema import org.apache.hudi.common.config.HoodieMetadataConfig -import org.apache.hudi.common.model.{HoodieRecord, OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload} +import org.apache.hudi.common.model.{HoodieRecord, OverwriteNonDefaultsWithLatestAvroPayload} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.testutils.{HadoopMapRedUtils, HoodieTestDataGenerator} import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig} @@ -332,7 +332,7 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with logWarning(s"Not matching bytes read ($bytesRead)") } - val readColumns = targetColumns ++ relation.mandatoryColumns + val readColumns = targetColumns ++ relation.mandatoryFields val (_, projectedStructType, _) = HoodieSparkUtils.getRequiredSchema(tableState.schema, readColumns) val row: InternalRow = rows.take(1).head