This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 7c766c33ddc5171743181767d1c6ce8d237ab5db
Author: Gary Li <[email protected]>
AuthorDate: Thu May 14 22:55:25 2020 -0700

    HUDI-528 Handle empty commit in incremental pulling (#1612)
---
 .../hudi/common/table/TableSchemaResolver.java     | 412 +++++++++++++++++++++
 .../apache/hudi/common/util/HoodieAvroUtils.java   |  14 +
 .../org/apache/hudi/IncrementalRelation.scala      |  42 +--
 hudi-spark/src/test/scala/TestDataSource.scala     |   8 +
 4 files changed, 452 insertions(+), 24 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
new file mode 100644
index 0000000..d623b5a
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
+import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.HoodieAvroUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.InvalidTableException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+
+/**
+ * Helper class to read schema from data files and log files and to convert it 
between different formats.
+ */
+public class TableSchemaResolver {
+
+  private static final Logger LOG = 
LogManager.getLogger(TableSchemaResolver.class);
+  private HoodieTableMetaClient metaClient;
+
+  public TableSchemaResolver(HoodieTableMetaClient metaClient) {
+    this.metaClient = metaClient;
+  }
+
+  /**
+   * Gets the schema for a hoodie table. Depending on the type of table, read 
from any file written in the latest
+   * commit. We will assume that the schema has not changed within a single 
atomic write.
+   *
+   * @return Parquet schema for this table
+   * @throws Exception
+   */
+  private MessageType getTableParquetSchemaFromDataFile() throws Exception {
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+
+    try {
+      switch (metaClient.getTableType()) {
+        case COPY_ON_WRITE:
+          // If this is COW, get the last commit and read the schema from a 
file written in the
+          // last commit
+          HoodieInstant lastCommit =
+              
activeTimeline.getCommitsTimeline().filterCompletedInstants().lastInstant().orElseThrow(()
 -> new InvalidTableException(metaClient.getBasePath()));
+          HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+              .fromBytes(activeTimeline.getInstantDetails(lastCommit).get(), 
HoodieCommitMetadata.class);
+          String filePath = 
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny()
+              .orElseThrow(() -> new IllegalArgumentException("Could not find 
any data file written for commit "
+                  + lastCommit + ", could not get schema for table " + 
metaClient.getBasePath() + ", Metadata :"
+                  + commitMetadata));
+          return readSchemaFromBaseFile(new Path(filePath));
+        case MERGE_ON_READ:
+          // If this is MOR, depending on whether the latest commit is a delta 
commit or
+          // compaction commit
+          // Get a datafile written and get the schema from that file
+          Option<HoodieInstant> lastCompactionCommit =
+              
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
+          LOG.info("Found the last compaction commit as " + 
lastCompactionCommit);
+
+          Option<HoodieInstant> lastDeltaCommit;
+          if (lastCompactionCommit.isPresent()) {
+            lastDeltaCommit = 
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
+                .findInstantsAfter(lastCompactionCommit.get().getTimestamp(), 
Integer.MAX_VALUE).lastInstant();
+          } else {
+            lastDeltaCommit =
+                
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
+          }
+          LOG.info("Found the last delta commit " + lastDeltaCommit);
+
+          if (lastDeltaCommit.isPresent()) {
+            HoodieInstant lastDeltaInstant = lastDeltaCommit.get();
+            // read from the log file wrote
+            commitMetadata = 
HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(lastDeltaInstant).get(),
+                HoodieCommitMetadata.class);
+            Pair<String, HoodieFileFormat> filePathWithFormat =
+                
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream()
+                    .filter(s -> 
s.contains(HoodieLogFile.DELTA_EXTENSION)).findAny()
+                    .map(f -> Pair.of(f, 
HoodieFileFormat.HOODIE_LOG)).orElseGet(() -> {
+                      // No Log files in Delta-Commit. Check if there are any 
parquet files
+                      return 
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream()
+                      .filter(s -> 
s.contains((metaClient.getTableConfig().getBaseFileFormat().getFileExtension())))
+                      .findAny().map(f -> Pair.of(f, 
HoodieFileFormat.PARQUET)).orElseThrow(() ->
+                          new IllegalArgumentException("Could not find any 
data file written for commit "
+                              + lastDeltaInstant + ", could not get schema for 
table " + metaClient.getBasePath()
+                              + ", CommitMetadata :" + commitMetadata));
+                    });
+            switch (filePathWithFormat.getRight()) {
+              case HOODIE_LOG:
+                return readSchemaFromLogFile(lastCompactionCommit, new 
Path(filePathWithFormat.getLeft()));
+              case PARQUET:
+                return readSchemaFromBaseFile(new 
Path(filePathWithFormat.getLeft()));
+              default:
+                throw new IllegalArgumentException("Unknown file format :" + 
filePathWithFormat.getRight()
+                    + " for file " + filePathWithFormat.getLeft());
+            }
+          } else {
+            return readSchemaFromLastCompaction(lastCompactionCommit);
+          }
+        default:
+          LOG.error("Unknown table type " + metaClient.getTableType());
+          throw new InvalidTableException(metaClient.getBasePath());
+      }
+    } catch (IOException e) {
+      throw new HoodieException("Failed to read data schema", e);
+    }
+  }
+
+  private Schema getTableAvroSchemaFromDataFile() throws Exception {
+    return convertParquetSchemaToAvro(getTableParquetSchemaFromDataFile());
+  }
+
+  /**
+   * Gets full schema (user + metadata) for a hoodie table in Avro format.
+   *
+   * @return Avro schema for this table
+   * @throws Exception
+   */
+  public Schema getTableAvroSchema() throws Exception {
+    Option<Schema> schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata(true);
+    return schemaFromCommitMetadata.isPresent() ? 
schemaFromCommitMetadata.get() : getTableAvroSchemaFromDataFile();
+  }
+
+  /**
+   * Gets full schema (user + metadata) for a hoodie table in Parquet format.
+   *
+   * @return Parquet schema for the table
+   * @throws Exception
+   */
+  public MessageType getTableParquetSchema() throws Exception {
+    Option<Schema> schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata(true);
+    return schemaFromCommitMetadata.isPresent() ? 
convertAvroSchemaToParquet(schemaFromCommitMetadata.get()) :
+        getTableParquetSchemaFromDataFile();
+  }
+
+  /**
+   * Gets users data schema for a hoodie table in Avro format.
+   *
+   * @return Avro user data schema
+   * @throws Exception
+   */
+  public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception {
+    Option<Schema> schemaFromCommitMetadata = 
getTableSchemaFromCommitMetadata(false);
+    return schemaFromCommitMetadata.isPresent() ? 
schemaFromCommitMetadata.get() :
+        HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
+  }
+
+  /**
+   * Gets the schema for a hoodie table in Avro format from the 
HoodieCommitMetadata of the last commit.
+   *
+   * @return Avro schema for this table
+   */
+  private Option<Schema> getTableSchemaFromCommitMetadata(boolean 
includeMetadataFields) {
+    try {
+      HoodieTimeline timeline = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+      byte[] data = 
timeline.getInstantDetails(timeline.lastInstant().get()).get();
+      HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, 
HoodieCommitMetadata.class);
+      String existingSchemaStr = 
metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
+
+      if (StringUtils.isNullOrEmpty(existingSchemaStr)) {
+        return Option.empty();
+      }
+
+      Schema schema = new Schema.Parser().parse(existingSchemaStr);
+      if (includeMetadataFields) {
+        schema = HoodieAvroUtils.addMetadataFields(schema);
+      }
+      return Option.of(schema);
+    } catch (Exception e) {
+      throw new HoodieException("Failed to read schema from commit metadata", 
e);
+    }
+  }
+
+  /**
+   * Convert a parquet scheme to the avro format.
+   *
+   * @param parquetSchema The parquet schema to convert
+   * @return The converted avro schema
+   */
+  public Schema convertParquetSchemaToAvro(MessageType parquetSchema) {
+    AvroSchemaConverter avroSchemaConverter = new 
AvroSchemaConverter(metaClient.getHadoopConf());
+    return avroSchemaConverter.convert(parquetSchema);
+  }
+
+  /**
+   * Convert a avro scheme to the parquet format.
+   *
+   * @param schema The avro schema to convert
+   * @return The converted parquet schema
+   */
+  public MessageType convertAvroSchemaToParquet(Schema schema) {
+    AvroSchemaConverter avroSchemaConverter = new 
AvroSchemaConverter(metaClient.getHadoopConf());
+    return avroSchemaConverter.convert(schema);
+  }
+
+  /**
+   * HUDI specific validation of schema evolution. Ensures that a newer schema 
can be used for the dataset by
+   * checking if the data written using the old schema can be read using the 
new schema.
+   * <p>
+   * HUDI requires a Schema to be specified in HoodieWriteConfig and is used 
by the HoodieWriteClient to
+   * create the records. The schema is also saved in the data files (parquet 
format) and log files (avro format).
+   * Since a schema is required each time new data is ingested into a HUDI 
dataset, schema can be evolved over time.
+   * <p>
+   * New Schema is compatible only if:
+   * A1. There is no change in schema
+   * A2. A field has been added and it has a default value specified
+   * <p>
+   * New Schema is incompatible if:
+   * B1. A field has been deleted
+   * B2. A field has been renamed (treated as delete + add)
+   * B3. A field's type has changed to be incompatible with the older type
+   * <p>
+   * Issue with org.apache.avro.SchemaCompatibility:
+   * org.apache.avro.SchemaCompatibility checks schema compatibility between a 
writer schema (which originally wrote
+   * the AVRO record) and a readerSchema (with which we are reading the 
record). It ONLY guarantees that that each
+   * field in the reader record can be populated from the writer record. 
Hence, if the reader schema is missing a
+   * field, it is still compatible with the writer schema.
+   * <p>
+   * In other words, org.apache.avro.SchemaCompatibility was written to 
guarantee that we can read the data written
+   * earlier. It does not guarantee schema evolution for HUDI (B1 above).
+   * <p>
+   * Implementation: This function implements specific HUDI specific checks 
(listed below) and defers the remaining
+   * checks to the org.apache.avro.SchemaCompatibility code.
+   * <p>
+   * Checks:
+   * C1. If there is no change in schema: success
+   * C2. If a field has been deleted in new schema: failure
+   * C3. If a field has been added in new schema: it should have default value 
specified
+   * C4. If a field has been renamed(treated as delete + add): failure
+   * C5. If a field type has changed: failure
+   *
+   * @param oldSchema Older schema to check.
+   * @param newSchema Newer schema to check.
+   * @return True if the schema validation is successful
+   */
+  public static boolean isSchemaCompatible(Schema oldSchema, Schema newSchema) 
{
+    if (oldSchema.getType() == newSchema.getType() && newSchema.getType() == 
Schema.Type.RECORD) {
+      // record names must match:
+      if (!SchemaCompatibility.schemaNameEquals(oldSchema, newSchema)) {
+        return false;
+      }
+
+      // Check that each field in the oldSchema can populated the newSchema
+      for (final Field oldSchemaField : oldSchema.getFields()) {
+        final Field newSchemaField = 
SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField);
+        if (newSchemaField == null) {
+          // C4 or C2: newSchema does not correspond to any field in the 
oldSchema
+          return false;
+        } else {
+          if (!isSchemaCompatible(oldSchemaField.schema(), 
newSchemaField.schema())) {
+            // C5: The fields do not have a compatible type
+            return false;
+          }
+        }
+      }
+
+      // Check that new fields added in newSchema have default values as they 
will not be
+      // present in oldSchema and hence cannot be populated on reading records 
from existing data.
+      for (final Field newSchemaField : newSchema.getFields()) {
+        final Field oldSchemaField = 
SchemaCompatibility.lookupWriterField(oldSchema, newSchemaField);
+        if (oldSchemaField == null) {
+          if (newSchemaField.defaultValue() == null) {
+            // C3: newly added field in newSchema does not have a default value
+            return false;
+          }
+        }
+      }
+
+      // All fields in the newSchema record can be populated from the 
oldSchema record
+      return true;
+    } else {
+      // Use the checks implemented by
+      org.apache.avro.SchemaCompatibility.SchemaPairCompatibility compatResult 
=
+          
org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(oldSchema, 
newSchema);
+      return compatResult.getType() == 
org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
+    }
+  }
+
+  public static boolean isSchemaCompatible(String oldSchema, String newSchema) 
{
+    return isSchemaCompatible(new Schema.Parser().parse(oldSchema), new 
Schema.Parser().parse(newSchema));
+  }
+
+  /**
+   * Read the parquet schema from a parquet File.
+   */
+  public MessageType readSchemaFromBaseFile(Path parquetFilePath) throws 
IOException {
+    LOG.info("Reading schema from " + parquetFilePath);
+
+    FileSystem fs = metaClient.getRawFs();
+    if (!fs.exists(parquetFilePath)) {
+      throw new IllegalArgumentException(
+          "Failed to read schema from data file " + parquetFilePath + ". File 
does not exist.");
+    }
+    ParquetMetadata fileFooter =
+        ParquetFileReader.readFooter(fs.getConf(), parquetFilePath, 
ParquetMetadataConverter.NO_FILTER);
+    return fileFooter.getFileMetaData().getSchema();
+  }
+
+  /**
+   * Read schema from a data file from the last compaction commit done.
+   *
+   * @throws Exception
+   */
+  public MessageType readSchemaFromLastCompaction(Option<HoodieInstant> 
lastCompactionCommitOpt) throws Exception {
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+
+    HoodieInstant lastCompactionCommit = 
lastCompactionCommitOpt.orElseThrow(() -> new Exception(
+        "Could not read schema from last compaction, no compaction commits 
found on path " + metaClient));
+
+    // Read from the compacted file wrote
+    HoodieCommitMetadata compactionMetadata = HoodieCommitMetadata
+        
.fromBytes(activeTimeline.getInstantDetails(lastCompactionCommit).get(), 
HoodieCommitMetadata.class);
+    String filePath = 
compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny()
+        .orElseThrow(() -> new IllegalArgumentException("Could not find any 
data file written for compaction "
+            + lastCompactionCommit + ", could not get schema for table " + 
metaClient.getBasePath()));
+    return readSchemaFromBaseFile(new Path(filePath));
+  }
+
+  /**
+   * Read the schema from the log file on path.
+   *
+   * @return
+   */
+  public MessageType readSchemaFromLogFile(Path path) throws IOException {
+    FileSystem fs = metaClient.getRawFs();
+    Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), 
null);
+    HoodieAvroDataBlock lastBlock = null;
+    while (reader.hasNext()) {
+      HoodieLogBlock block = reader.next();
+      if (block instanceof HoodieAvroDataBlock) {
+        lastBlock = (HoodieAvroDataBlock) block;
+      }
+    }
+    reader.close();
+    if (lastBlock != null) {
+      return new AvroSchemaConverter().convert(lastBlock.getSchema());
+    }
+    return null;
+  }
+
+  /**
+   * Read the schema from the log file on path.
+   *
+   * @throws Exception
+   */
+  public MessageType readSchemaFromLogFile(Option<HoodieInstant> 
lastCompactionCommitOpt, Path path)
+      throws Exception {
+    MessageType messageType = readSchemaFromLogFile(path);
+    // Fall back to read the schema from last compaction
+    if (messageType == null) {
+      LOG.info("Falling back to read the schema from last compaction " + 
lastCompactionCommitOpt);
+      return readSchemaFromLastCompaction(lastCompactionCommitOpt);
+    }
+    return messageType;
+  }
+
+  /**
+   * Read the schema from the log file on path.
+   *
+   * @return
+   */
+  public static MessageType readSchemaFromLogFile(FileSystem fs, Path path) 
throws IOException {
+    Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), 
null);
+    HoodieAvroDataBlock lastBlock = null;
+    while (reader.hasNext()) {
+      HoodieLogBlock block = reader.next();
+      if (block instanceof HoodieAvroDataBlock) {
+        lastBlock = (HoodieAvroDataBlock) block;
+      }
+    }
+    reader.close();
+    if (lastBlock != null) {
+      return new AvroSchemaConverter().convert(lastBlock.getSchema());
+    }
+    return null;
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java
index 32ea71e..e9dddff 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java
@@ -97,6 +97,10 @@ public class HoodieAvroUtils {
         || HoodieRecord.FILENAME_METADATA_FIELD.equals(fieldName);
   }
 
+  public static Schema createHoodieWriteSchema(Schema originalSchema) {
+    return HoodieAvroUtils.addMetadataFields(originalSchema);
+  }
+
   /**
    * Adds the Hoodie metadata fields to the given schema.
    */
@@ -177,6 +181,16 @@ public class HoodieAvroUtils {
     return newSchema;
   }
 
+  public static Schema removeMetadataFields(Schema schema) {
+    List<Schema.Field> filteredFields = schema.getFields()
+        .stream()
+        .filter(field -> 
!HoodieRecord.HOODIE_META_COLUMNS.contains(field.name()))
+        .collect(Collectors.toList());
+    Schema filteredSchema = Schema.createRecord(schema.getName(), 
schema.getDoc(), schema.getNamespace(), false);
+    filteredSchema.setFields(filteredFields);
+    return filteredSchema;
+  }
+
   /**
    * Adds the Hoodie commit metadata into the provided Generic Record.
    */
diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala 
b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index a9e7389..9fca419 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -18,10 +18,9 @@
 package org.apache.hudi
 
 import org.apache.hadoop.fs.GlobPattern
-import org.apache.hadoop.fs.Path
 import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, 
HoodieTableType}
-import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.common.util.ParquetUtils
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.util.HoodieAvroUtils
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.table.HoodieTable
@@ -35,11 +34,11 @@ import scala.collection.JavaConversions._
 import scala.collection.mutable
 
 /**
-  * Relation, that implements the Hoodie incremental view.
-  *
-  * Implemented for Copy_on_write storage.
-  *
-  */
+ * Relation, that implements the Hoodie incremental view.
+ *
+ * Implemented for Copy_on_write storage.
+ *
+ */
 class IncrementalRelation(val sqlContext: SQLContext,
                           val basePath: String,
                           val optParams: Map[String, String],
@@ -47,8 +46,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
 
   private val log = LogManager.getLogger(classOf[IncrementalRelation])
 
-  val fs = new 
Path(basePath).getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-  val metaClient = new 
HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, 
true)
+  private val metaClient = new 
HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, 
true)
   // MOR tables not supported yet
   if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
     throw new HoodieException("Incremental view not implemented yet, for 
merge-on-read tables")
@@ -56,7 +54,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
   // TODO : Figure out a valid HoodieWriteConfig
   private val hoodieTable = HoodieTable.getHoodieTable(metaClient, 
HoodieWriteConfig.newBuilder().withPath(basePath).build(),
     sqlContext.sparkContext)
-  val commitTimeline = 
hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants()
+  private val commitTimeline = 
hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants()
   if (commitTimeline.empty()) {
     throw new HoodieException("No instants to incrementally pull")
   }
@@ -65,25 +63,21 @@ class IncrementalRelation(val sqlContext: SQLContext,
       s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY}")
   }
 
-  val lastInstant = commitTimeline.lastInstant().get()
+  private val lastInstant = commitTimeline.lastInstant().get()
 
-  val commitsToReturn = commitTimeline.findInstantsInRange(
+  private val commitsToReturn = commitTimeline.findInstantsInRange(
     optParams(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY),
     optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, 
lastInstant.getTimestamp))
     .getInstants.iterator().toList
 
-  // use schema from a file produced in the latest instant
-  val latestSchema = {
-    // use last instant if instant range is empty
-    val instant = commitsToReturn.lastOption.getOrElse(lastInstant)
-    val latestMeta = HoodieCommitMetadata
-          .fromBytes(commitTimeline.getInstantDetails(instant).get, 
classOf[HoodieCommitMetadata])
-    val metaFilePath = 
latestMeta.getFileIdAndFullPaths(basePath).values().iterator().next()
-    
AvroConversionUtils.convertAvroSchemaToStructType(ParquetUtils.readAvroSchema(
-      sqlContext.sparkContext.hadoopConfiguration, new Path(metaFilePath)))
+  // use schema from latest metadata, if not present, read schema from the 
data file
+  private val latestSchema = {
+    val schemaUtil = new TableSchemaResolver(metaClient)
+    val tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields);
+    AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
   }
 
-  val filters = {
+  private val filters = {
     if 
(optParams.contains(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY)) {
       val filterStr = optParams.getOrElse(
         DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY,
@@ -106,7 +100,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
     val pathGlobPattern = optParams.getOrElse(
       DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY,
       DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)
-    val filteredFullPath = 
if(!pathGlobPattern.equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL))
 {
+    val filteredFullPath = if 
(!pathGlobPattern.equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)) 
{
       val globMatcher = new GlobPattern("*" + pathGlobPattern)
       fileIdToFullPath.filter(p => globMatcher.matches(p._2))
     } else {
diff --git a/hudi-spark/src/test/scala/TestDataSource.scala 
b/hudi-spark/src/test/scala/TestDataSource.scala
index 6bd32b4..ab44c76 100644
--- a/hudi-spark/src/test/scala/TestDataSource.scala
+++ b/hudi-spark/src/test/scala/TestDataSource.scala
@@ -126,6 +126,14 @@ class TestDataSource extends AssertionsForJUnit {
     assertEquals(1, countsPerCommit.length)
     assertEquals(firstCommit, countsPerCommit(0).get(0))
 
+    // Upsert an empty dataFrame
+    val emptyRecords = 
DataSourceTestUtils.convertToStringList(dataGen.generateUpdates("002", 
0)).toList
+    val emptyDF: Dataset[Row] = 
spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1))
+    emptyDF.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
     // pull the latest commit
     val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
       .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)

Reply via email to