This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d5444ff  [HUDI-3018] Adding validation to dataframe scheme to ensure 
reserved field does not have diff data type (#4852)
d5444ff is described below

commit d5444ff7ff832a2e14b5c78449713fdf82bcaec4
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sun Feb 27 11:59:23 2022 -0500

    [HUDI-3018] Adding validation to dataframe scheme to ensure reserved field 
does not have diff data type (#4852)
---
 .../org/apache/hudi/AvroConversionUtils.scala      |  1 -
 .../org/apache/hudi/common/model/HoodieRecord.java |  1 +
 .../model/OverwriteWithLatestAvroPayload.java      |  2 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 16 ++++++++-----
 .../apache/hudi/functional/TestCOWDataSource.scala | 26 +++++++++++++++++-----
 5 files changed, 33 insertions(+), 13 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index a006eeb..62bcbf6 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -17,7 +17,6 @@
  */
 
 package org.apache.hudi
-
 import org.apache.avro.Schema.Type
 import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, 
IndexedRecord}
 import org.apache.avro.{AvroRuntimeException, JsonProperties, Schema}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index f90448e..ac30766 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -40,6 +40,7 @@ public abstract class HoodieRecord<T> implements Serializable 
{
   public static final String PARTITION_PATH_METADATA_FIELD = 
"_hoodie_partition_path";
   public static final String FILENAME_METADATA_FIELD = "_hoodie_file_name";
   public static final String OPERATION_METADATA_FIELD = "_hoodie_operation";
+  public static final String HOODIE_IS_DELETED = "_hoodie_is_deleted";
 
   public static final List<String> HOODIE_META_COLUMNS =
       CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, 
COMMIT_SEQNO_METADATA_FIELD,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
index 4be2e3e..7b7bd6c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
@@ -85,7 +85,7 @@ public class OverwriteWithLatestAvroPayload extends 
BaseAvroPayload
    * @returns {@code true} if record represents a delete record. {@code false} 
otherwise.
    */
   protected boolean isDeleteRecord(GenericRecord genericRecord) {
-    final String isDeleteKey = "_hoodie_is_deleted";
+    final String isDeleteKey = HoodieRecord.HOODIE_IS_DELETED;
     // Modify to be compatible with new version Avro.
     // The new version Avro throws for GenericRecord.get if the field name
     // does not exist in the schema.
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index b7f04c5..89304d3 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -19,11 +19,10 @@ package org.apache.hudi
 
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericRecord
-
+import org.apache.avro.reflect.AvroSchema
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.conf.HiveConf
-
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieWriterUtils._
 import org.apache.hudi.avro.HoodieAvroUtils
@@ -45,9 +44,7 @@ import 
org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKey
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
 import org.apache.hudi.sync.common.AbstractSyncTool
 import org.apache.hudi.table.BulkInsertPartitioner
-
 import org.apache.log4j.LogManager
-
 import org.apache.spark.SPARK_VERSION
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.hive.HiveExternalCatalog
@@ -58,7 +55,6 @@ import org.apache.spark.sql._
 import org.apache.spark.SparkContext
 
 import java.util.Properties
-
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
@@ -242,6 +238,7 @@ object HoodieSparkSqlWriter {
             if (reconcileSchema) {
               schema = getLatestTableSchema(fs, basePath, sparkContext, schema)
             }
+            validateSchemaForHoodieIsDeleted(schema)
             sparkContext.getConf.registerAvroSchemas(schema)
             log.info(s"Registered avro schema : ${schema.toString(true)}")
 
@@ -432,6 +429,14 @@ object HoodieSparkSqlWriter {
     }
   }
 
+  def validateSchemaForHoodieIsDeleted(schema: Schema): Unit = {
+    if (schema.getField(HoodieRecord.HOODIE_IS_DELETED) != null &&
+      
AvroConversionUtils.resolveAvroTypeNullability(schema.getField(HoodieRecord.HOODIE_IS_DELETED).schema())._2.getType
 != Schema.Type.BOOLEAN) {
+      throw new HoodieException(HoodieRecord.HOODIE_IS_DELETED + " has to be 
BOOLEAN type. Passed in dataframe's schema has type "
+        + schema.getField(HoodieRecord.HOODIE_IS_DELETED).schema().getType)
+    }
+  }
+
   def bulkInsertAsRow(sqlContext: SQLContext,
                       parameters: Map[String, String],
                       df: DataFrame,
@@ -454,6 +459,7 @@ object HoodieSparkSqlWriter {
     if (dropPartitionColumns) {
       schema = generateSchemaWithoutPartitionColumns(partitionColumns, schema)
     }
+    validateSchemaForHoodieIsDeleted(schema)
     sparkContext.getConf.registerAvroSchemas(schema)
     log.info(s"Registered avro schema : ${schema.toString(true)}")
     if (parameters(INSERT_DROP_DUPS.key).toBoolean) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 58b36f8..96d50f6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -18,23 +18,21 @@
 package org.apache.hudi.functional
 
 import org.apache.hadoop.fs.FileSystem
-
 import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.table.timeline.HoodieInstant
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import 
org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, 
recordsToStrings}
 import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.exception.HoodieUpsertException
+import org.apache.hudi.exception.{HoodieException, HoodieUpsertException}
 import org.apache.hudi.keygen._
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
 import org.apache.hudi.testutils.HoodieClientTestBase
-import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceWriteOptions, HoodieDataSourceHelpers}
-
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieMergeOnReadRDD}
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions.{col, concat, lit, udf}
 import org.apache.spark.sql.types._
-
 import org.joda.time.DateTime
 import org.joda.time.format.DateTimeFormat
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue, fail}
@@ -44,7 +42,6 @@ import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
 
 import java.sql.{Date, Timestamp}
-
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
@@ -98,6 +95,23 @@ class TestCOWDataSource extends HoodieClientTestBase {
     assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
   }
 
+
+  @Test def testHoodieIsDeletedNonBooleanField() {
+    // Insert Operation
+    val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+    val df = inputDF.withColumn(HoodieRecord.HOODIE_IS_DELETED, lit("abc"))
+
+    assertThrows(classOf[HoodieException], new Executable {
+      override def execute(): Unit = {
+        df.write.format("hudi")
+          .options(commonOpts)
+          .mode(SaveMode.Overwrite)
+          .save(basePath)
+      }
+    }, "Should have failed since _hoodie_is_deleted is not a BOOLEAN data 
type")
+  }
+
   /**
    * This tests the case that query by with a specified partition condition on 
hudi table which is
    * different between the value of the partition field and the actual 
partition path,

Reply via email to