This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch rc3-patched-for-test in repository https://gitbox.apache.org/repos/asf/hudi.git
commit f58876cef0b1e6dfc6f56c1fe45b0090d2cfffd7 Author: Alexey Kudinkin <ale...@infinilake.com> AuthorDate: Wed Apr 20 14:27:42 2022 -0700 Handle incompatibilities b/w Spark 3.2.0 and 3.2.1 in `Spark32HoodieParquetFileFormat` --- .../scala/org/apache/hudi/HoodieSparkUtils.scala | 8 +- .../parquet/Spark32HoodieParquetFileFormat.scala | 183 +++++++++++++++++---- 2 files changed, 160 insertions(+), 31 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 54bc06bd76..7a8f8a1580 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -53,13 +53,15 @@ object HoodieSparkUtils extends SparkAdapterSupport { def isSpark3_1: Boolean = SPARK_VERSION.startsWith("3.1") + def gteqSpark3_1: Boolean = SPARK_VERSION > "3.1" + + def gteqSpark3_1_3: Boolean = SPARK_VERSION >= "3.1.3" + def isSpark3_2: Boolean = SPARK_VERSION.startsWith("3.2") def gteqSpark3_2: Boolean = SPARK_VERSION > "3.2" - def gteqSpark3_1: Boolean = SPARK_VERSION > "3.1" - - def gteqSpark3_1_3: Boolean = SPARK_VERSION >= "3.1.3" + def gteqSpark3_2_1: Boolean = SPARK_VERSION >= "3.2.1" def getMetaSchema: StructType = { StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => { diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala index f2a0a21df8..dfeedd7ae4 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala @@ -22,11 +22,12 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.FileSplit import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} +import org.apache.hudi.HoodieSparkUtils import org.apache.hudi.client.utils.SparkInternalSchemaConverter import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.util.InternalSchemaCache import org.apache.hudi.common.util.StringUtils.isNullOrEmpty import org.apache.hudi.common.util.collection.Pair +import org.apache.hudi.common.util.{InternalSchemaCache, ReflectionUtils} import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.action.InternalSchemaMerger import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper} @@ -35,17 +36,18 @@ import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader} import org.apache.spark.TaskContext -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow} import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat.{pruneInternalSchema, rebuildFilterFromParquet} +import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat._ import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType} -import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession} +import org.apache.spark.util.{SerializableConfiguration, Utils} import java.net.URI @@ -158,21 +160,38 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo lazy val footerFileMetaData = ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData - val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec( - footerFileMetaData.getKeyValueMetaData.get, - datetimeRebaseModeInRead) // Try to push down filters when filter push-down is enabled. val pushed = if (enableParquetFilterPushDown) { val parquetSchema = footerFileMetaData.getSchema - val parquetFilters = new ParquetFilters( - parquetSchema, - pushDownDate, - pushDownTimestamp, - pushDownDecimal, - pushDownStringStartWith, - pushDownInFilterThreshold, - isCaseSensitive, - datetimeRebaseSpec) + val parquetFilters = if (HoodieSparkUtils.gteqSpark3_2_1) { + // NOTE: Below code could only be compiled against >= Spark 3.2.1, + // and unfortunately won't compile against Spark 3.2.0 + // However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1 + val datetimeRebaseSpec = + DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringStartWith, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseSpec) + } else { + // Spark 3.2.0 + val datetimeRebaseMode = + Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + createParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringStartWith, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseMode) + } filters.map(rebuildFilterFromParquet(_, fileSchema, querySchemaOption.orElse(null))) // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` @@ -198,10 +217,6 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo None } - val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( - footerFileMetaData.getKeyValueMetaData.get, - int96RebaseModeInRead) - val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) // Clone new conf @@ -225,6 +240,10 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo if (enableVectorizedReader) { val vectorizedReader = if (shouldUseInternalSchema) { + val int96RebaseSpec = + DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + val datetimeRebaseSpec = + DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) new Spark32HoodieVectorizedParquetRecordReader( convertTz.orNull, datetimeRebaseSpec.mode.toString, @@ -234,7 +253,14 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo enableOffHeapColumnVector && taskContext.isDefined, capacity, typeChangeInfos) - } else { + } else if (HoodieSparkUtils.gteqSpark3_2_1) { + // NOTE: Below code could only be compiled against >= Spark 3.2.1, + // and unfortunately won't compile against Spark 3.2.0 + // However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1 + val int96RebaseSpec = + DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + val datetimeRebaseSpec = + DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) new VectorizedParquetRecordReader( convertTz.orNull, datetimeRebaseSpec.mode.toString, @@ -243,7 +269,20 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo int96RebaseSpec.timeZone, enableOffHeapColumnVector && taskContext.isDefined, capacity) + } else { + // Spark 3.2.0 + val datetimeRebaseMode = + Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + val int96RebaseMode = + Spark32HoodieParquetFileFormat.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + createVectorizedParquetRecordReader( + convertTz.orNull, + datetimeRebaseMode.toString, + int96RebaseMode.toString, + enableOffHeapColumnVector && taskContext.isDefined, + capacity) } + // SPARK-37089: We cannot register a task completion listener to close this iterator here // because downstream exec nodes have already registered their listeners. Since listeners // are executed in reverse order of registration, a listener registered here would close the @@ -279,12 +318,32 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo } } else { logDebug(s"Falling back to parquet-mr") - // ParquetRecordReader returns InternalRow - val readSupport = new ParquetReadSupport( - convertTz, - enableVectorizedReader = false, - datetimeRebaseSpec, - int96RebaseSpec) + val readSupport = if (HoodieSparkUtils.gteqSpark3_2_1) { + // ParquetRecordReader returns InternalRow + // NOTE: Below code could only be compiled against >= Spark 3.2.1, + // and unfortunately won't compile against Spark 3.2.0 + // However this code is runtime-compatible w/ both Spark 3.2.0 and >= Spark 3.2.1 + val int96RebaseSpec = + DataSourceUtils.int96RebaseSpec(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + val datetimeRebaseSpec = + DataSourceUtils.datetimeRebaseSpec(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + new ParquetReadSupport( + convertTz, + enableVectorizedReader = false, + datetimeRebaseSpec, + int96RebaseSpec) + } else { + val datetimeRebaseMode = + Spark32HoodieParquetFileFormat.datetimeRebaseMode(footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) + val int96RebaseMode = + Spark32HoodieParquetFileFormat.int96RebaseMode(footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + createParquetReadSupport( + convertTz, + /* enableVectorizedReader = */ false, + datetimeRebaseMode, + int96RebaseMode) + } + val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) new ParquetRecordReader[InternalRow](readSupport, parquetFilter) @@ -332,10 +391,78 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo } } } + } object Spark32HoodieParquetFileFormat { + private val PARQUET_FILTERS_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetFilters" + private val PARQUET_VECTORIZED_READER_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader" + private val PARQUET_READ_SUPPORT_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport" + + private def createParquetFilters(args: Any*): ParquetFilters = { + val parquetFiltersInstance = ReflectionUtils.loadClass(PARQUET_FILTERS_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*) + parquetFiltersInstance.asInstanceOf[ParquetFilters] + } + + private def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader = { + val vectorizedRecordReader = + ReflectionUtils.loadClass(PARQUET_VECTORIZED_READER_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*) + vectorizedRecordReader.asInstanceOf[VectorizedParquetRecordReader] + } + + private def createParquetReadSupport(args: Any*): ParquetReadSupport = { + val parquetReadSupport = + ReflectionUtils.loadClass(PARQUET_READ_SUPPORT_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*) + parquetReadSupport.asInstanceOf[ParquetReadSupport] + } + + // TODO scala-doc + // Spark 3.2.0 + // scalastyle:off + def int96RebaseMode(lookupFileMeta: String => String, + modeByConfig: String): LegacyBehaviorPolicy.Value = { + if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") { + return LegacyBehaviorPolicy.CORRECTED + } + // If there is no version, we return the mode specified by the config. + Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version => + // Files written by Spark 3.0 and earlier follow the legacy hybrid calendar and we need to + // rebase the INT96 timestamp values. + // Files written by Spark 3.1 and latter may also need the rebase if they were written with + // the "LEGACY" rebase mode. + if (version < "3.1.0" || lookupFileMeta("org.apache.spark.legacyINT96") != null) { + LegacyBehaviorPolicy.LEGACY + } else { + LegacyBehaviorPolicy.CORRECTED + } + }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig)) + } + // scalastyle:on + + // TODO scala-doc + // Spark 3.2.0 + // scalastyle:off + def datetimeRebaseMode(lookupFileMeta: String => String, + modeByConfig: String): LegacyBehaviorPolicy.Value = { + if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") { + return LegacyBehaviorPolicy.CORRECTED + } + // If there is no version, we return the mode specified by the config. + Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version => + // Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to + // rebase the datetime values. + // Files written by Spark 3.0 and latter may also need the rebase if they were written with + // the "LEGACY" rebase mode. + if (version < "3.0.0" || lookupFileMeta("org.apache.spark.legacyDateTime") != null) { + LegacyBehaviorPolicy.LEGACY + } else { + LegacyBehaviorPolicy.CORRECTED + } + }.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig)) + } + // scalastyle:on + def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = { val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr) if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {