This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.10.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit f39538edca68df2eac2fe9a20b173262ebac17d2
Author: Vinish Reddy <[email protected]>
AuthorDate: Wed Jan 5 22:13:10 2022 +0530

    [HUDI-3168] Fixing null schema with empty commit in incremental relation 
(#4513)
---
 .../org/apache/hudi/IncrementalRelation.scala      | 154 +++++++++++----------
 .../sources/S3EventsHoodieIncrSource.java          |   4 +
 2 files changed, 86 insertions(+), 72 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 958a15e..1907108 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -17,8 +17,9 @@
 
 package org.apache.hudi
 
-import java.util.stream.Collectors
+import org.apache.avro.Schema
 
+import java.util.stream.Collectors
 import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, 
HoodieReplaceCommitMetadata, HoodieTableType}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
@@ -89,8 +90,13 @@ class IncrementalRelation(val sqlContext: SQLContext,
     } else {
       schemaResolver.getTableAvroSchemaWithoutMetadataFields()
     }
-    val dataSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
-    StructType(skeletonSchema.fields ++ dataSchema.fields)
+    if (tableSchema.getType == Schema.Type.NULL) {
+      // if there is only one commit in the table and is an empty commit 
without schema, return empty RDD here
+      StructType(Nil)
+    } else {
+      val dataSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+      StructType(skeletonSchema.fields ++ dataSchema.fields)
+    }
   }
 
   private val filters = 
optParams.getOrElse(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS.key,
@@ -99,86 +105,90 @@ class IncrementalRelation(val sqlContext: SQLContext,
   override def schema: StructType = usedSchema
 
   override def buildScan(): RDD[Row] = {
-    val regularFileIdToFullPath = mutable.HashMap[String, String]()
-    var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]()
-
-    // create Replaced file group
-    val replacedTimeline = commitsTimelineToReturn.getCompletedReplaceTimeline
-    val replacedFile = 
replacedTimeline.getInstants.collect(Collectors.toList[HoodieInstant]).flatMap 
{ instant =>
-      val replaceMetadata = HoodieReplaceCommitMetadata.
-        fromBytes(metaClient.getActiveTimeline.getInstantDetails(instant).get, 
classOf[HoodieReplaceCommitMetadata])
-      replaceMetadata.getPartitionToReplaceFileIds.entrySet().flatMap { entry 
=>
-        entry.getValue.map { e =>
-          val fullPath = FSUtils.getPartitionPath(basePath, 
entry.getKey).toString
-          (e, fullPath)
+    if (usedSchema == StructType(Nil)) {
+      // if first commit in a table is an empty commit without schema, return 
empty RDD here
+      sqlContext.sparkContext.emptyRDD[Row]
+    } else {
+      val regularFileIdToFullPath = mutable.HashMap[String, String]()
+      var metaBootstrapFileIdToFullPath = mutable.HashMap[String, String]()
+
+      // create Replaced file group
+      val replacedTimeline = 
commitsTimelineToReturn.getCompletedReplaceTimeline
+      val replacedFile = 
replacedTimeline.getInstants.collect(Collectors.toList[HoodieInstant]).flatMap 
{ instant =>
+        val replaceMetadata = HoodieReplaceCommitMetadata.
+          
fromBytes(metaClient.getActiveTimeline.getInstantDetails(instant).get, 
classOf[HoodieReplaceCommitMetadata])
+        replaceMetadata.getPartitionToReplaceFileIds.entrySet().flatMap { 
entry =>
+          entry.getValue.map { e =>
+            val fullPath = FSUtils.getPartitionPath(basePath, 
entry.getKey).toString
+            (e, fullPath)
+          }
+        }
+      }.toMap
+
+      for (commit <- commitsToReturn) {
+        val metadata: HoodieCommitMetadata = 
HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
+          .get, classOf[HoodieCommitMetadata])
+
+        if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == 
commit.getTimestamp) {
+          metaBootstrapFileIdToFullPath ++= 
metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
+            replacedFile.contains(k) && v.startsWith(replacedFile(k))
+          }
+        } else {
+          regularFileIdToFullPath ++= 
metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
+            replacedFile.contains(k) && v.startsWith(replacedFile(k))
+          }
         }
       }
-    }.toMap
 
-    for (commit <- commitsToReturn) {
-      val metadata: HoodieCommitMetadata = 
HoodieCommitMetadata.fromBytes(commitTimeline.getInstantDetails(commit)
-        .get, classOf[HoodieCommitMetadata])
+      if (metaBootstrapFileIdToFullPath.nonEmpty) {
+        // filer out meta bootstrap files that have had more commits since 
metadata bootstrap
+        metaBootstrapFileIdToFullPath = metaBootstrapFileIdToFullPath
+          .filterNot(fileIdFullPath => 
regularFileIdToFullPath.contains(fileIdFullPath._1))
+      }
 
-      if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS == commit.getTimestamp) 
{
-        metaBootstrapFileIdToFullPath ++= 
metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
-          replacedFile.contains(k) && v.startsWith(replacedFile(k))
-        }
-      } else {
-        regularFileIdToFullPath ++= 
metadata.getFileIdAndFullPaths(basePath).toMap.filterNot { case (k, v) =>
-          replacedFile.contains(k) && v.startsWith(replacedFile(k))
+      val pathGlobPattern = optParams.getOrElse(
+        DataSourceReadOptions.INCR_PATH_GLOB.key,
+        DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)
+      val (filteredRegularFullPaths, filteredMetaBootstrapFullPaths) = {
+        if 
(!pathGlobPattern.equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) {
+          val globMatcher = new GlobPattern("*" + pathGlobPattern)
+          (regularFileIdToFullPath.filter(p => 
globMatcher.matches(p._2)).values,
+            metaBootstrapFileIdToFullPath.filter(p => 
globMatcher.matches(p._2)).values)
+        } else {
+          (regularFileIdToFullPath.values, 
metaBootstrapFileIdToFullPath.values)
         }
       }
-    }
-
-    if (metaBootstrapFileIdToFullPath.nonEmpty) {
-      // filer out meta bootstrap files that have had more commits since 
metadata bootstrap
-      metaBootstrapFileIdToFullPath = metaBootstrapFileIdToFullPath
-        .filterNot(fileIdFullPath => 
regularFileIdToFullPath.contains(fileIdFullPath._1))
-    }
-
-    val pathGlobPattern = optParams.getOrElse(
-      DataSourceReadOptions.INCR_PATH_GLOB.key,
-      DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)
-    val (filteredRegularFullPaths, filteredMetaBootstrapFullPaths) = {
-      
if(!pathGlobPattern.equals(DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)) {
-        val globMatcher = new GlobPattern("*" + pathGlobPattern)
-        (regularFileIdToFullPath.filter(p => globMatcher.matches(p._2)).values,
-          metaBootstrapFileIdToFullPath.filter(p => 
globMatcher.matches(p._2)).values)
+      // unset the path filter, otherwise if end_instant_time is not the 
latest instant, path filter set for RO view
+      // will filter out all the files incorrectly.
+      
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
+      val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
+      if (filteredRegularFullPaths.isEmpty && 
filteredMetaBootstrapFullPaths.isEmpty) {
+        sqlContext.sparkContext.emptyRDD[Row]
       } else {
-        (regularFileIdToFullPath.values, metaBootstrapFileIdToFullPath.values)
-      }
-    }
-    // unset the path filter, otherwise if end_instant_time is not the latest 
instant, path filter set for RO view
-    // will filter out all the files incorrectly.
-    
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
-    val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
-    if (filteredRegularFullPaths.isEmpty && 
filteredMetaBootstrapFullPaths.isEmpty) {
-      sqlContext.sparkContext.emptyRDD[Row]
-    } else {
-      log.info("Additional Filters to be applied to incremental source are :" 
+ filters)
+        log.info("Additional Filters to be applied to incremental source are 
:" + filters)
 
-      var df: DataFrame = 
sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
+        var df: DataFrame = 
sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
 
-      if (metaBootstrapFileIdToFullPath.nonEmpty) {
-        df = sqlContext.sparkSession.read
-               .format("hudi")
-               .schema(usedSchema)
-               .option(DataSourceReadOptions.READ_PATHS.key, 
filteredMetaBootstrapFullPaths.mkString(","))
-               .load()
-      }
+        if (metaBootstrapFileIdToFullPath.nonEmpty) {
+          df = sqlContext.sparkSession.read
+            .format("hudi")
+            .schema(usedSchema)
+            .option(DataSourceReadOptions.READ_PATHS.key, 
filteredMetaBootstrapFullPaths.mkString(","))
+            .load()
+        }
 
-      if (regularFileIdToFullPath.nonEmpty)
-      {
-        df = df.union(sqlContext.read.options(sOpts)
-                        .schema(usedSchema)
-                        .parquet(filteredRegularFullPaths.toList: _*)
-                        .filter(String.format("%s >= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-                          commitsToReturn.head.getTimestamp))
-                        .filter(String.format("%s <= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-                          commitsToReturn.last.getTimestamp)))
-      }
+        if (regularFileIdToFullPath.nonEmpty) {
+          df = df.union(sqlContext.read.options(sOpts)
+            .schema(usedSchema)
+            .parquet(filteredRegularFullPaths.toList: _*)
+            .filter(String.format("%s >= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+              commitsToReturn.head.getTimestamp))
+            .filter(String.format("%s <= '%s'", 
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
+              commitsToReturn.last.getTimestamp)))
+        }
 
-      filters.foldLeft(df)((e, f) => e.filter(f)).rdd
+        filters.foldLeft(df)((e, f) => e.filter(f)).rdd
+      }
     }
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
index ec789ab..f67fbcc 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java
@@ -105,6 +105,10 @@ public class S3EventsHoodieIncrSource extends 
HoodieIncrSource {
         .option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), 
instantEndpts.getLeft())
         .option(DataSourceReadOptions.END_INSTANTTIME().key(), 
instantEndpts.getRight());
     Dataset<Row> source = metaReader.load(srcPath);
+    
+    if (source.isEmpty()) {
+      return Pair.of(Option.empty(), instantEndpts.getRight());
+    }
 
     String filter = "s3.object.size > 0";
     if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX))) {

Reply via email to