This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.5.3 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 7c766c33ddc5171743181767d1c6ce8d237ab5db Author: Gary Li <[email protected]> AuthorDate: Thu May 14 22:55:25 2020 -0700 HUDI-528 Handle empty commit in incremental pulling (#1612) --- .../hudi/common/table/TableSchemaResolver.java | 412 +++++++++++++++++++++ .../apache/hudi/common/util/HoodieAvroUtils.java | 14 + .../org/apache/hudi/IncrementalRelation.scala | 42 +-- hudi-spark/src/test/scala/TestDataSource.scala | 8 + 4 files changed, 452 insertions(+), 24 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java new file mode 100644 index 0000000..d623b5a --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table; + +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.log.HoodieLogFormat.Reader; +import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.HoodieAvroUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.InvalidTableException; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.SchemaCompatibility; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; + +import java.io.IOException; + +/** + * Helper class to read schema from data files and log files and to convert it between different formats. + */ +public class TableSchemaResolver { + + private static final Logger LOG = LogManager.getLogger(TableSchemaResolver.class); + private HoodieTableMetaClient metaClient; + + public TableSchemaResolver(HoodieTableMetaClient metaClient) { + this.metaClient = metaClient; + } + + /** + * Gets the schema for a hoodie table. Depending on the type of table, read from any file written in the latest + * commit. We will assume that the schema has not changed within a single atomic write. + * + * @return Parquet schema for this table + * @throws Exception + */ + private MessageType getTableParquetSchemaFromDataFile() throws Exception { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + try { + switch (metaClient.getTableType()) { + case COPY_ON_WRITE: + // If this is COW, get the last commit and read the schema from a file written in the + // last commit + HoodieInstant lastCommit = + activeTimeline.getCommitsTimeline().filterCompletedInstants().lastInstant().orElseThrow(() -> new InvalidTableException(metaClient.getBasePath())); + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(activeTimeline.getInstantDetails(lastCommit).get(), HoodieCommitMetadata.class); + String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny() + .orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for commit " + + lastCommit + ", could not get schema for table " + metaClient.getBasePath() + ", Metadata :" + + commitMetadata)); + return readSchemaFromBaseFile(new Path(filePath)); + case MERGE_ON_READ: + // If this is MOR, depending on whether the latest commit is a delta commit or + // compaction commit + // Get a datafile written and get the schema from that file + Option<HoodieInstant> lastCompactionCommit = + metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant(); + LOG.info("Found the last compaction commit as " + lastCompactionCommit); + + Option<HoodieInstant> lastDeltaCommit; + if (lastCompactionCommit.isPresent()) { + lastDeltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants() + .findInstantsAfter(lastCompactionCommit.get().getTimestamp(), Integer.MAX_VALUE).lastInstant(); + } else { + lastDeltaCommit = + metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant(); + } + LOG.info("Found the last delta commit " + lastDeltaCommit); + + if (lastDeltaCommit.isPresent()) { + HoodieInstant lastDeltaInstant = lastDeltaCommit.get(); + // read from the log file wrote + commitMetadata = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(lastDeltaInstant).get(), + HoodieCommitMetadata.class); + Pair<String, HoodieFileFormat> filePathWithFormat = + commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream() + .filter(s -> s.contains(HoodieLogFile.DELTA_EXTENSION)).findAny() + .map(f -> Pair.of(f, HoodieFileFormat.HOODIE_LOG)).orElseGet(() -> { + // No Log files in Delta-Commit. Check if there are any parquet files + return commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream() + .filter(s -> s.contains((metaClient.getTableConfig().getBaseFileFormat().getFileExtension()))) + .findAny().map(f -> Pair.of(f, HoodieFileFormat.PARQUET)).orElseThrow(() -> + new IllegalArgumentException("Could not find any data file written for commit " + + lastDeltaInstant + ", could not get schema for table " + metaClient.getBasePath() + + ", CommitMetadata :" + commitMetadata)); + }); + switch (filePathWithFormat.getRight()) { + case HOODIE_LOG: + return readSchemaFromLogFile(lastCompactionCommit, new Path(filePathWithFormat.getLeft())); + case PARQUET: + return readSchemaFromBaseFile(new Path(filePathWithFormat.getLeft())); + default: + throw new IllegalArgumentException("Unknown file format :" + filePathWithFormat.getRight() + + " for file " + filePathWithFormat.getLeft()); + } + } else { + return readSchemaFromLastCompaction(lastCompactionCommit); + } + default: + LOG.error("Unknown table type " + metaClient.getTableType()); + throw new InvalidTableException(metaClient.getBasePath()); + } + } catch (IOException e) { + throw new HoodieException("Failed to read data schema", e); + } + } + + private Schema getTableAvroSchemaFromDataFile() throws Exception { + return convertParquetSchemaToAvro(getTableParquetSchemaFromDataFile()); + } + + /** + * Gets full schema (user + metadata) for a hoodie table in Avro format. + * + * @return Avro schema for this table + * @throws Exception + */ + public Schema getTableAvroSchema() throws Exception { + Option<Schema> schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(true); + return schemaFromCommitMetadata.isPresent() ? schemaFromCommitMetadata.get() : getTableAvroSchemaFromDataFile(); + } + + /** + * Gets full schema (user + metadata) for a hoodie table in Parquet format. + * + * @return Parquet schema for the table + * @throws Exception + */ + public MessageType getTableParquetSchema() throws Exception { + Option<Schema> schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(true); + return schemaFromCommitMetadata.isPresent() ? convertAvroSchemaToParquet(schemaFromCommitMetadata.get()) : + getTableParquetSchemaFromDataFile(); + } + + /** + * Gets users data schema for a hoodie table in Avro format. + * + * @return Avro user data schema + * @throws Exception + */ + public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception { + Option<Schema> schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(false); + return schemaFromCommitMetadata.isPresent() ? schemaFromCommitMetadata.get() : + HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile()); + } + + /** + * Gets the schema for a hoodie table in Avro format from the HoodieCommitMetadata of the last commit. + * + * @return Avro schema for this table + */ + private Option<Schema> getTableSchemaFromCommitMetadata(boolean includeMetadataFields) { + try { + HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + byte[] data = timeline.getInstantDetails(timeline.lastInstant().get()).get(); + HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class); + String existingSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY); + + if (StringUtils.isNullOrEmpty(existingSchemaStr)) { + return Option.empty(); + } + + Schema schema = new Schema.Parser().parse(existingSchemaStr); + if (includeMetadataFields) { + schema = HoodieAvroUtils.addMetadataFields(schema); + } + return Option.of(schema); + } catch (Exception e) { + throw new HoodieException("Failed to read schema from commit metadata", e); + } + } + + /** + * Convert a parquet scheme to the avro format. + * + * @param parquetSchema The parquet schema to convert + * @return The converted avro schema + */ + public Schema convertParquetSchemaToAvro(MessageType parquetSchema) { + AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(metaClient.getHadoopConf()); + return avroSchemaConverter.convert(parquetSchema); + } + + /** + * Convert a avro scheme to the parquet format. + * + * @param schema The avro schema to convert + * @return The converted parquet schema + */ + public MessageType convertAvroSchemaToParquet(Schema schema) { + AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(metaClient.getHadoopConf()); + return avroSchemaConverter.convert(schema); + } + + /** + * HUDI specific validation of schema evolution. Ensures that a newer schema can be used for the dataset by + * checking if the data written using the old schema can be read using the new schema. + * <p> + * HUDI requires a Schema to be specified in HoodieWriteConfig and is used by the HoodieWriteClient to + * create the records. The schema is also saved in the data files (parquet format) and log files (avro format). + * Since a schema is required each time new data is ingested into a HUDI dataset, schema can be evolved over time. + * <p> + * New Schema is compatible only if: + * A1. There is no change in schema + * A2. A field has been added and it has a default value specified + * <p> + * New Schema is incompatible if: + * B1. A field has been deleted + * B2. A field has been renamed (treated as delete + add) + * B3. A field's type has changed to be incompatible with the older type + * <p> + * Issue with org.apache.avro.SchemaCompatibility: + * org.apache.avro.SchemaCompatibility checks schema compatibility between a writer schema (which originally wrote + * the AVRO record) and a readerSchema (with which we are reading the record). It ONLY guarantees that that each + * field in the reader record can be populated from the writer record. Hence, if the reader schema is missing a + * field, it is still compatible with the writer schema. + * <p> + * In other words, org.apache.avro.SchemaCompatibility was written to guarantee that we can read the data written + * earlier. It does not guarantee schema evolution for HUDI (B1 above). + * <p> + * Implementation: This function implements specific HUDI specific checks (listed below) and defers the remaining + * checks to the org.apache.avro.SchemaCompatibility code. + * <p> + * Checks: + * C1. If there is no change in schema: success + * C2. If a field has been deleted in new schema: failure + * C3. If a field has been added in new schema: it should have default value specified + * C4. If a field has been renamed(treated as delete + add): failure + * C5. If a field type has changed: failure + * + * @param oldSchema Older schema to check. + * @param newSchema Newer schema to check. + * @return True if the schema validation is successful + */ + public static boolean isSchemaCompatible(Schema oldSchema, Schema newSchema) { + if (oldSchema.getType() == newSchema.getType() && newSchema.getType() == Schema.Type.RECORD) { + // record names must match: + if (!SchemaCompatibility.schemaNameEquals(oldSchema, newSchema)) { + return false; + } + + // Check that each field in the oldSchema can populated the newSchema + for (final Field oldSchemaField : oldSchema.getFields()) { + final Field newSchemaField = SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField); + if (newSchemaField == null) { + // C4 or C2: newSchema does not correspond to any field in the oldSchema + return false; + } else { + if (!isSchemaCompatible(oldSchemaField.schema(), newSchemaField.schema())) { + // C5: The fields do not have a compatible type + return false; + } + } + } + + // Check that new fields added in newSchema have default values as they will not be + // present in oldSchema and hence cannot be populated on reading records from existing data. + for (final Field newSchemaField : newSchema.getFields()) { + final Field oldSchemaField = SchemaCompatibility.lookupWriterField(oldSchema, newSchemaField); + if (oldSchemaField == null) { + if (newSchemaField.defaultValue() == null) { + // C3: newly added field in newSchema does not have a default value + return false; + } + } + } + + // All fields in the newSchema record can be populated from the oldSchema record + return true; + } else { + // Use the checks implemented by + org.apache.avro.SchemaCompatibility.SchemaPairCompatibility compatResult = + org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(oldSchema, newSchema); + return compatResult.getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE; + } + } + + public static boolean isSchemaCompatible(String oldSchema, String newSchema) { + return isSchemaCompatible(new Schema.Parser().parse(oldSchema), new Schema.Parser().parse(newSchema)); + } + + /** + * Read the parquet schema from a parquet File. + */ + public MessageType readSchemaFromBaseFile(Path parquetFilePath) throws IOException { + LOG.info("Reading schema from " + parquetFilePath); + + FileSystem fs = metaClient.getRawFs(); + if (!fs.exists(parquetFilePath)) { + throw new IllegalArgumentException( + "Failed to read schema from data file " + parquetFilePath + ". File does not exist."); + } + ParquetMetadata fileFooter = + ParquetFileReader.readFooter(fs.getConf(), parquetFilePath, ParquetMetadataConverter.NO_FILTER); + return fileFooter.getFileMetaData().getSchema(); + } + + /** + * Read schema from a data file from the last compaction commit done. + * + * @throws Exception + */ + public MessageType readSchemaFromLastCompaction(Option<HoodieInstant> lastCompactionCommitOpt) throws Exception { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + + HoodieInstant lastCompactionCommit = lastCompactionCommitOpt.orElseThrow(() -> new Exception( + "Could not read schema from last compaction, no compaction commits found on path " + metaClient)); + + // Read from the compacted file wrote + HoodieCommitMetadata compactionMetadata = HoodieCommitMetadata + .fromBytes(activeTimeline.getInstantDetails(lastCompactionCommit).get(), HoodieCommitMetadata.class); + String filePath = compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny() + .orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for compaction " + + lastCompactionCommit + ", could not get schema for table " + metaClient.getBasePath())); + return readSchemaFromBaseFile(new Path(filePath)); + } + + /** + * Read the schema from the log file on path. + * + * @return + */ + public MessageType readSchemaFromLogFile(Path path) throws IOException { + FileSystem fs = metaClient.getRawFs(); + Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null); + HoodieAvroDataBlock lastBlock = null; + while (reader.hasNext()) { + HoodieLogBlock block = reader.next(); + if (block instanceof HoodieAvroDataBlock) { + lastBlock = (HoodieAvroDataBlock) block; + } + } + reader.close(); + if (lastBlock != null) { + return new AvroSchemaConverter().convert(lastBlock.getSchema()); + } + return null; + } + + /** + * Read the schema from the log file on path. + * + * @throws Exception + */ + public MessageType readSchemaFromLogFile(Option<HoodieInstant> lastCompactionCommitOpt, Path path) + throws Exception { + MessageType messageType = readSchemaFromLogFile(path); + // Fall back to read the schema from last compaction + if (messageType == null) { + LOG.info("Falling back to read the schema from last compaction " + lastCompactionCommitOpt); + return readSchemaFromLastCompaction(lastCompactionCommitOpt); + } + return messageType; + } + + /** + * Read the schema from the log file on path. + * + * @return + */ + public static MessageType readSchemaFromLogFile(FileSystem fs, Path path) throws IOException { + Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null); + HoodieAvroDataBlock lastBlock = null; + while (reader.hasNext()) { + HoodieLogBlock block = reader.next(); + if (block instanceof HoodieAvroDataBlock) { + lastBlock = (HoodieAvroDataBlock) block; + } + } + reader.close(); + if (lastBlock != null) { + return new AvroSchemaConverter().convert(lastBlock.getSchema()); + } + return null; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java index 32ea71e..e9dddff 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java @@ -97,6 +97,10 @@ public class HoodieAvroUtils { || HoodieRecord.FILENAME_METADATA_FIELD.equals(fieldName); } + public static Schema createHoodieWriteSchema(Schema originalSchema) { + return HoodieAvroUtils.addMetadataFields(originalSchema); + } + /** * Adds the Hoodie metadata fields to the given schema. */ @@ -177,6 +181,16 @@ public class HoodieAvroUtils { return newSchema; } + public static Schema removeMetadataFields(Schema schema) { + List<Schema.Field> filteredFields = schema.getFields() + .stream() + .filter(field -> !HoodieRecord.HOODIE_META_COLUMNS.contains(field.name())) + .collect(Collectors.toList()); + Schema filteredSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false); + filteredSchema.setFields(filteredFields); + return filteredSchema; + } + /** * Adds the Hoodie commit metadata into the provided Generic Record. */ diff --git a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala index a9e7389..9fca419 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -18,10 +18,9 @@ package org.apache.hudi import org.apache.hadoop.fs.GlobPattern -import org.apache.hadoop.fs.Path import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieTableType} -import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.hudi.common.util.ParquetUtils +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.util.HoodieAvroUtils import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException import org.apache.hudi.table.HoodieTable @@ -35,11 +34,11 @@ import scala.collection.JavaConversions._ import scala.collection.mutable /** - * Relation, that implements the Hoodie incremental view. - * - * Implemented for Copy_on_write storage. - * - */ + * Relation, that implements the Hoodie incremental view. + * + * Implemented for Copy_on_write storage. + * + */ class IncrementalRelation(val sqlContext: SQLContext, val basePath: String, val optParams: Map[String, String], @@ -47,8 +46,7 @@ class IncrementalRelation(val sqlContext: SQLContext, private val log = LogManager.getLogger(classOf[IncrementalRelation]) - val fs = new Path(basePath).getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true) + private val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true) // MOR tables not supported yet if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) { throw new HoodieException("Incremental view not implemented yet, for merge-on-read tables") @@ -56,7 +54,7 @@ class IncrementalRelation(val sqlContext: SQLContext, // TODO : Figure out a valid HoodieWriteConfig private val hoodieTable = HoodieTable.getHoodieTable(metaClient, HoodieWriteConfig.newBuilder().withPath(basePath).build(), sqlContext.sparkContext) - val commitTimeline = hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants() + private val commitTimeline = hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants() if (commitTimeline.empty()) { throw new HoodieException("No instants to incrementally pull") } @@ -65,25 +63,21 @@ class IncrementalRelation(val sqlContext: SQLContext, s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY}") } - val lastInstant = commitTimeline.lastInstant().get() + private val lastInstant = commitTimeline.lastInstant().get() - val commitsToReturn = commitTimeline.findInstantsInRange( + private val commitsToReturn = commitTimeline.findInstantsInRange( optParams(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY), optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, lastInstant.getTimestamp)) .getInstants.iterator().toList - // use schema from a file produced in the latest instant - val latestSchema = { - // use last instant if instant range is empty - val instant = commitsToReturn.lastOption.getOrElse(lastInstant) - val latestMeta = HoodieCommitMetadata - .fromBytes(commitTimeline.getInstantDetails(instant).get, classOf[HoodieCommitMetadata]) - val metaFilePath = latestMeta.getFileIdAndFullPaths(basePath).values().iterator().next() - AvroConversionUtils.convertAvroSchemaToStructType(ParquetUtils.readAvroSchema( - sqlContext.sparkContext.hadoopConfiguration, new Path(metaFilePath))) + // use schema from latest metadata, if not present, read schema from the data file + private val latestSchema = { + val schemaUtil = new TableSchemaResolver(metaClient) + val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields); + AvroConversionUtils.convertAvroSchemaToStructType(tableSchema) } - val filters = { + private val filters = { if (optParams.contains(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY)) { val filterStr = optParams.getOrElse( DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY, @@ -106,7 +100,7 @@ class IncrementalRelation(val sqlContext: SQLContext, val pathGlobPattern = optParams.getOrElse( DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL) - val filteredFullPath = if(!pathGlobPattern.equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)) { + val filteredFullPath = if (!pathGlobPattern.equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)) { val globMatcher = new GlobPattern("*" + pathGlobPattern) fileIdToFullPath.filter(p => globMatcher.matches(p._2)) } else { diff --git a/hudi-spark/src/test/scala/TestDataSource.scala b/hudi-spark/src/test/scala/TestDataSource.scala index 6bd32b4..ab44c76 100644 --- a/hudi-spark/src/test/scala/TestDataSource.scala +++ b/hudi-spark/src/test/scala/TestDataSource.scala @@ -126,6 +126,14 @@ class TestDataSource extends AssertionsForJUnit { assertEquals(1, countsPerCommit.length) assertEquals(firstCommit, countsPerCommit(0).get(0)) + // Upsert an empty dataFrame + val emptyRecords = DataSourceTestUtils.convertToStringList(dataGen.generateUpdates("002", 0)).toList + val emptyDF: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1)) + emptyDF.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Append) + .save(basePath) + // pull the latest commit val hoodieIncViewDF2 = spark.read.format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
