This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.10.1-rc1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit f39538edca68df2eac2fe9a20b173262ebac17d2 Author: Vinish Reddy <[email protected]> AuthorDate: Wed Jan 5 22:13:10 2022 +0530 [HUDI-3168] Fixing null schema with empty commit in incremental relation (#4513) --- .../org/apache/hudi/IncrementalRelation.scala | 154 +++++++++++---------- .../sources/S3EventsHoodieIncrSource.java | 4 + 2 files changed, 86 insertions(+), 72 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala index 958a15e..1907108 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -17,8 +17,9 @@ package org.apache.hudi -import java.util.stream.Collectors +import org.apache.avro.Schema +import java.util.stream.Collectors import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieReplaceCommitMetadata, HoodieTableType} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} @@ -89,8 +90,13 @@ class IncrementalRelation(val sqlContext: SQLContext, } else { schemaResolver.getTableAvroSchemaWithoutMetadataFields() } - val dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema) - StructType(skeletonSchema.fields ++ dataSchema.fields) + if (tableSchema.getType == Schema.Type.NULL) { + // if there is only one commit in the table and is an empty commit without schema, return empty RDD here + StructType(Nil) + } else { + val dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema) + StructType(skeletonSchema.fields ++ dataSchema.fields) + } } private val filters = optParams.getOrElse(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS.key, @@ -99,86 +105,90 @@ class IncrementalRelation(val sqlContext: SQLContext, override def schema: StructType = usedSchema override def buildScan(): RDD[Row] = { - val regularFileIdToFullPath = mutable.HashMap[String, String]() - var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]() - - // create Replaced file group - val replacedTimeline = commitsTimelineToReturn.getCompletedReplaceTimeline - val replacedFile = replacedTimeline.getInstants.collect(Collectors.toList[HoodieInstant]).flatMap { instant => - val replaceMetadata = HoodieReplaceCommitMetadata. - fromBytes(metaClient.getActiveTimeline.getInstantDetails(instant).get, classOf[HoodieReplaceCommitMetadata]) - replaceMetadata.getPartitionToReplaceFileIds.entrySet().flatMap { entry => - entry.getValue.map { e => - val fullPath = FSUtils.getPartitionPath(basePath, entry.getKey).toString - (e, fullPath) + if (usedSchema == StructType(Nil)) { + // if first commit in a table is an empty commit without schema, return empty RDD here + sqlContext.sparkContext.emptyRDD[Row] + } else { + val regularFileIdToFullPath = mutable.HashMap[String, String]() + var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]() + + // create Replaced file group + val replacedTimeline = commitsTimelineToReturn.getCompletedReplaceTimeline + val replacedFile = replacedTimeline.getInstants.collect(Collectors.toList[HoodieInstant]).flatMap { instant => + val replaceMetadata = HoodieReplaceCommitMetadata. + fromBytes(metaClient.getActiveTimeline.getInstantDetails(instant).get, classOf[HoodieReplaceCommitMetadata]) + replaceMetadata.getPartitionToReplaceFileIds.entrySet().flatMap { entry => + entry.getValue.map { e => + val fullPath = FSUtils.getPartitionPath(basePath, entry.getKey).toString + (e, fullPath) + } + } + }.toMap + + for (commit <- commitsToReturn) { + val metadata: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit) + .get, classOf[HoodieCommitMetadata]) + + if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) { + metaBootstrapFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) => + replacedFile.contains(k) && v.startsWith(replacedFile(k)) + } + } else { + regularFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) => + replacedFile.contains(k) && v.startsWith(replacedFile(k)) + } } } - }.toMap - for (commit <- commitsToReturn) { - val metadata: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit) - .get, classOf[HoodieCommitMetadata]) + if (metaBootstrapFileIdToFullPath.nonEmpty) { + // filer out meta bootstrap files that have had more commits since metadata bootstrap + metaBootstrapFileIdToFullPath = metaBootstrapFileIdToFullPath + .filterNot(fileIdFullPath => regularFileIdToFullPath.contains(fileIdFullPath._1)) + } - if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) { - metaBootstrapFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) => - replacedFile.contains(k) && v.startsWith(replacedFile(k)) - } - } else { - regularFileIdToFullPath ++= metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) => - replacedFile.contains(k) && v.startsWith(replacedFile(k)) + val pathGlobPattern = optParams.getOrElse( + DataSourceReadOptions.INCR_PATH_GLOB.key, + DataSourceReadOptions.INCR_PATH_GLOB.defaultValue) + val (filteredRegularFullPaths, filteredMetaBootstrapFullPaths) = { + if (!pathGlobPattern.equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) { + val globMatcher = new GlobPattern("*" + pathGlobPattern) + (regularFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values, + metaBootstrapFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values) + } else { + (regularFileIdToFullPath.values, metaBootstrapFileIdToFullPath.values) } } - } - - if (metaBootstrapFileIdToFullPath.nonEmpty) { - // filer out meta bootstrap files that have had more commits since metadata bootstrap - metaBootstrapFileIdToFullPath = metaBootstrapFileIdToFullPath - .filterNot(fileIdFullPath => regularFileIdToFullPath.contains(fileIdFullPath._1)) - } - - val pathGlobPattern = optParams.getOrElse( - DataSourceReadOptions.INCR_PATH_GLOB.key, - DataSourceReadOptions.INCR_PATH_GLOB.defaultValue) - val (filteredRegularFullPaths, filteredMetaBootstrapFullPaths) = { - if(!pathGlobPattern.equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) { - val globMatcher = new GlobPattern("*" + pathGlobPattern) - (regularFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values, - metaBootstrapFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values) + // unset the path filter, otherwise if end_instant_time is not the latest instant, path filter set for RO view + // will filter out all the files incorrectly. + sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class") + val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path")) + if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) { + sqlContext.sparkContext.emptyRDD[Row] } else { - (regularFileIdToFullPath.values, metaBootstrapFileIdToFullPath.values) - } - } - // unset the path filter, otherwise if end_instant_time is not the latest instant, path filter set for RO view - // will filter out all the files incorrectly. - sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class") - val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path")) - if (filteredRegularFullPaths.isEmpty && filteredMetaBootstrapFullPaths.isEmpty) { - sqlContext.sparkContext.emptyRDD[Row] - } else { - log.info("Additional Filters to be applied to incremental source are :" + filters) + log.info("Additional Filters to be applied to incremental source are :" + filters) - var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema) + var df: DataFrame = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema) - if (metaBootstrapFileIdToFullPath.nonEmpty) { - df = sqlContext.sparkSession.read - .format("hudi") - .schema(usedSchema) - .option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(",")) - .load() - } + if (metaBootstrapFileIdToFullPath.nonEmpty) { + df = sqlContext.sparkSession.read + .format("hudi") + .schema(usedSchema) + .option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(",")) + .load() + } - if (regularFileIdToFullPath.nonEmpty) - { - df = df.union(sqlContext.read.options(sOpts) - .schema(usedSchema) - .parquet(filteredRegularFullPaths.toList: _*) - .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, - commitsToReturn.head.getTimestamp)) - .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, - commitsToReturn.last.getTimestamp))) - } + if (regularFileIdToFullPath.nonEmpty) { + df = df.union(sqlContext.read.options(sOpts) + .schema(usedSchema) + .parquet(filteredRegularFullPaths.toList: _*) + .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + commitsToReturn.head.getTimestamp)) + .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + commitsToReturn.last.getTimestamp))) + } - filters.foldLeft(df)((e, f) => e.filter(f)).rdd + filters.foldLeft(df)((e, f) => e.filter(f)).rdd + } } } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index ec789ab..f67fbcc 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -105,6 +105,10 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { .option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), instantEndpts.getLeft()) .option(DataSourceReadOptions.END_INSTANTTIME().key(), instantEndpts.getRight()); Dataset<Row> source = metaReader.load(srcPath); + + if (source.isEmpty()) { + return Pair.of(Option.empty(), instantEndpts.getRight()); + } String filter = "s3.object.size > 0"; if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX))) {
