This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch rc3-patched-for-test in repository https://gitbox.apache.org/repos/asf/hudi.git
commit a4b2ff77f10222cac0d71bca778140c4dda1eb11 Author: Alexey Kudinkin <ale...@infinilake.com> AuthorDate: Wed Apr 20 14:44:46 2022 -0700 Extracted `int96RebaseMode`, `datetimeRebaseMode` into `Spark32DataSourceUtils` --- .../parquet/Spark32DataSourceUtils.scala | 77 ++++++++++++++++++++++ .../parquet/Spark32HoodieParquetFileFormat.scala | 56 ++-------------- 2 files changed, 82 insertions(+), 51 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32DataSourceUtils.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32DataSourceUtils.scala new file mode 100644 index 0000000000..6d1c76380f --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32DataSourceUtils.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy +import org.apache.spark.util.Utils + +object Spark32DataSourceUtils { + + /** + * NOTE: This method was copied from Spark 3.2.0, and is required to maintain runtime + * compatibility against Spark 3.2.0 + */ + // scalastyle:off + def int96RebaseMode(lookupFileMeta: String => String, + modeByConfig: String): LegacyBehaviorPolicy.Value = { + if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") { + return LegacyBehaviorPolicy.CORRECTED + } + // If there is no version, we return the mode specified by the config. + Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version => + // Files written by Spark 3.0 and earlier follow the legacy hybrid calendar and we need to + // rebase the INT96 timestamp values. + // Files written by Spark 3.1 and latter may also need the rebase if they were written with + // the "LEGACY" rebase mode. + if (version < "3.1.0" || lookupFileMeta("org.apache.spark.legacyINT96") != null) { + LegacyBehaviorPolicy.LEGACY + } else { + LegacyBehaviorPolicy.CORRECTED + } + }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig)) + } + // scalastyle:on + + /** + * NOTE: This method was copied from Spark 3.2.0, and is required to maintain runtime + * compatibility against Spark 3.2.0 + */ + // scalastyle:off + def datetimeRebaseMode(lookupFileMeta: String => String, + modeByConfig: String): LegacyBehaviorPolicy.Value = { + if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") { + return LegacyBehaviorPolicy.CORRECTED + } + // If there is no version, we return the mode specified by the config. + Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version => + // Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to + // rebase the datetime values. + // Files written by Spark 3.0 and latter may also need the rebase if they were written with + // the "LEGACY" rebase mode. + if (version < "3.0.0" || lookupFileMeta("org.apache.spark.legacyDateTime") != null) { + LegacyBehaviorPolicy.LEGACY + } else { + LegacyBehaviorPolicy.CORRECTED + } + }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig)) + } + // scalastyle:on + +} diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala index dfeedd7ae4..99cb83cf51 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala @@ -181,7 +181,7 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo } else { // Spark 3.2.0 val datetimeRebaseMode = - Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + Spark32DataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) createParquetFilters( parquetSchema, pushDownDate, @@ -272,9 +272,9 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo } else { // Spark 3.2.0 val datetimeRebaseMode = - Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + Spark32DataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) val int96RebaseMode = - Spark32HoodieParquetFileFormat.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + Spark32DataSourceUtils.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) createVectorizedParquetRecordReader( convertTz.orNull, datetimeRebaseMode.toString, @@ -334,9 +334,9 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo int96RebaseSpec) } else { val datetimeRebaseMode = - Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + Spark32DataSourceUtils.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) val int96RebaseMode = - Spark32HoodieParquetFileFormat.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + Spark32DataSourceUtils.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) createParquetReadSupport( convertTz, /* enableVectorizedReader = */ false, @@ -417,52 +417,6 @@ object Spark32HoodieParquetFileFormat { parquetReadSupport.asInstanceOf[ParquetReadSupport] } - // TODO scala-doc - // Spark 3.2.0 - // scalastyle:off - def int96RebaseMode(lookupFileMeta: String => String, - modeByConfig: String): LegacyBehaviorPolicy.Value = { - if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") { - return LegacyBehaviorPolicy.CORRECTED - } - // If there is no version, we return the mode specified by the config. - Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version => - // Files written by Spark 3.0 and earlier follow the legacy hybrid calendar and we need to - // rebase the INT96 timestamp values. - // Files written by Spark 3.1 and latter may also need the rebase if they were written with - // the "LEGACY" rebase mode. - if (version < "3.1.0" || lookupFileMeta("org.apache.spark.legacyINT96") != null) { - LegacyBehaviorPolicy.LEGACY - } else { - LegacyBehaviorPolicy.CORRECTED - } - }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig)) - } - // scalastyle:on - - // TODO scala-doc - // Spark 3.2.0 - // scalastyle:off - def datetimeRebaseMode(lookupFileMeta: String => String, - modeByConfig: String): LegacyBehaviorPolicy.Value = { - if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") { - return LegacyBehaviorPolicy.CORRECTED - } - // If there is no version, we return the mode specified by the config. - Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version => - // Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to - // rebase the datetime values. - // Files written by Spark 3.0 and latter may also need the rebase if they were written with - // the "LEGACY" rebase mode. - if (version < "3.0.0" || lookupFileMeta("org.apache.spark.legacyDateTime") != null) { - LegacyBehaviorPolicy.LEGACY - } else { - LegacyBehaviorPolicy.CORRECTED - } - }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig)) - } - // scalastyle:on - def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = { val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr) if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {