This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a60522a6fab [HUDI-5514] Improving usability/performance with out of 
box default for append only use-cases (#8697)
a60522a6fab is described below

commit a60522a6fab074100164405c463d59aff7c4f5bd
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Aug 4 02:57:30 2023 -0400

    [HUDI-5514] Improving usability/performance with out of box default for 
append only use-cases (#8697)
    
    We are recently added auto record key generation support mainly to cater to 
append only use-cases. So, as part of the work stream, this is a follow up 
patch. This patch makes bulk_insert as the default operation type for append 
only use-cases. For regular pipelines, "upsert" will be the default operation.
    
    With this patch, users can now do the following w/ hudi.
    
    a. non-partitioned: 
df.write.format("hudi").option("hoodie.table.name","hudi_tbl").save(path) -> 
will use bulk_insert.
    b. partitioned dataset: 
df.write.partitionBy(colA).format("hudi").option("hoodie.table.name","hudi_tbl").save(path)
 -> will use bulk_insert.
    c. multi-field partitioning: df.write.partitionBy(colA, 
colB).format("hudi").option("hoodie.table.name","hudi_tbl").save(path)  -> will 
use bulk_insert.
    
    If any of the config is set among (record key, operation), default 
operation is chosen to be "upsert" which has been the behavior so far.
---
 .../apache/hudi/AutoRecordKeyGenerationUtils.scala |  9 ++++-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 41 ++++++++++++-------
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      | 26 ++++++------
 .../TestAutoGenerationOfRecordKeys.scala           | 16 ++++++++
 .../apache/spark/sql/hudi/TestCreateTable.scala    | 46 ++++++++++++++++++++++
 .../apache/spark/sql/hudi/TestInsertTable.scala    | 41 ++++++++++++++++++-
 6 files changed, 152 insertions(+), 27 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
index 9b3a10a3f62..ca679acc799 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
@@ -19,14 +19,17 @@
 
 package org.apache.hudi
 
-import org.apache.hudi.DataSourceWriteOptions.INSERT_DROP_DUPS
+import org.apache.hudi.DataSourceWriteOptions.{INSERT_DROP_DUPS, 
PRECOMBINE_FIELD}
+import org.apache.hudi.HoodieSparkSqlWriter.getClass
 import org.apache.hudi.common.config.HoodieConfig
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieKeyGeneratorException
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.slf4j.LoggerFactory
 
 object AutoRecordKeyGenerationUtils {
+  private val log = LoggerFactory.getLogger(getClass)
 
   def mayBeValidateParamsForAutoGenerationOfRecordKeys(parameters: Map[String, 
String], hoodieConfig: HoodieConfig): Unit = {
     val autoGenerateRecordKeys = 
!parameters.contains(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) // if 
record key is not configured,
@@ -46,5 +49,9 @@ object AutoRecordKeyGenerationUtils {
         throw new HoodieKeyGeneratorException("Disabling " + 
HoodieTableConfig.POPULATE_META_FIELDS.key() + " is not supported with auto 
generation of record keys")
       }
     }
+
+    if (hoodieConfig.contains(PRECOMBINE_FIELD.key())) {
+      log.warn("Precombine field " + 
hoodieConfig.getString(PRECOMBINE_FIELD.key()) + " will be ignored with auto 
record key generation enabled")
+    }
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 07b16e1e47d..c7d1a4979bb 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -157,20 +157,7 @@ object HoodieSparkSqlWriter {
       case _ => throw new HoodieException("hoodie only support 
org.apache.spark.serializer.KryoSerializer as spark.serializer")
     }
     val tableType = HoodieTableType.valueOf(hoodieConfig.getString(TABLE_TYPE))
-    var operation = 
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
-    // TODO clean up
-    // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS 
is true
-    // Auto-correct the operation to "insert" if OPERATION is set to "upsert" 
wrongly
-    // or not set (in which case it will be set as "upsert" by 
parametersWithWriteDefaults()) .
-    if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
-      operation == WriteOperationType.UPSERT) {
-
-      log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
-        s"when $INSERT_DROP_DUPS is set to be true, " +
-        s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
-
-      operation = WriteOperationType.INSERT
-    }
+    val operation = deduceOperation(hoodieConfig, paramsWithoutDefaults)
 
     val jsc = new JavaSparkContext(sparkContext)
     if (asyncCompactionTriggerFn.isDefined) {
@@ -428,6 +415,32 @@ object HoodieSparkSqlWriter {
     }
   }
 
+  def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults : 
Map[String, String]): WriteOperationType = {
+    var operation = 
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
+    // TODO clean up
+    // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS 
is true
+    // Auto-correct the operation to "insert" if OPERATION is set to "upsert" 
wrongly
+    // or not set (in which case it will be set as "upsert" by 
parametersWithWriteDefaults()) .
+    if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
+      operation == WriteOperationType.UPSERT) {
+
+      log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
+        s"when $INSERT_DROP_DUPS is set to be true, " +
+        s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
+
+      operation = WriteOperationType.INSERT
+      operation
+    } else {
+      // if no record key, we should treat it as append only workload and make 
bulk_insert as operation type.
+      if 
(!paramsWithoutDefaults.containsKey(DataSourceWriteOptions.RECORDKEY_FIELD.key())
+        && !paramsWithoutDefaults.containsKey(OPERATION.key())) {
+        log.warn(s"Choosing BULK_INSERT as the operation type since auto 
record key generation is applicable")
+        operation = WriteOperationType.BULK_INSERT
+      }
+      operation
+    }
+  }
+
   /**
    * Deduces writer's schema based on
    * <ul>
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 7e2cd9ef308..c66dcc19549 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -28,6 +28,7 @@ import org.apache.hudi.config.{HoodieIndexConfig, 
HoodieInternalConfig, HoodieWr
 import org.apache.hudi.hive.ddl.HiveSyncMode
 import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder, 
MultiPartKeysValueExtractor}
 import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.hudi.sql.InsertMode
 import org.apache.hudi.sync.common.HoodieSyncConfig
 import org.apache.spark.internal.Logging
@@ -122,27 +123,29 @@ trait ProvidesHoodieConfig extends Logging {
    */
   private def deduceOperation(enableBulkInsert: Boolean, isOverwritePartition: 
Boolean, isOverwriteTable: Boolean,
                               dropDuplicate: Boolean, isNonStrictMode: 
Boolean, isPartitionedTable: Boolean,
-                              combineBeforeInsert: Boolean, insertMode: 
InsertMode): String = {
-    (enableBulkInsert, isOverwritePartition, isOverwriteTable, dropDuplicate, 
isNonStrictMode, isPartitionedTable) match {
-      case (true, _, _, _, false, _) =>
+                              combineBeforeInsert: Boolean, insertMode: 
InsertMode, autoGenerateRecordKeys: Boolean): String = {
+    (enableBulkInsert, isOverwritePartition, isOverwriteTable, dropDuplicate, 
isNonStrictMode, isPartitionedTable, autoGenerateRecordKeys) match {
+      case (true, _, _, _, false, _, _) =>
         throw new IllegalArgumentException(s"Table with primaryKey can not use 
bulk insert in ${insertMode.value()} mode.")
-      case (true, _, _, true, _, _) =>
+      case (true, _, _, true, _, _, _) =>
         throw new IllegalArgumentException(s"Bulk insert cannot support drop 
duplication." +
           s" Please disable $INSERT_DROP_DUPS and try again.")
       // Bulk insert with overwrite table
-      case (true, false, true, _, _, _) =>
+      case (true, false, true, _, _, _, _) =>
         BULK_INSERT_OPERATION_OPT_VAL
       // Bulk insert with overwrite table partition
-      case (true, true, false, _, _, true) =>
+      case (true, true, false, _, _, true, _) =>
         BULK_INSERT_OPERATION_OPT_VAL
       // insert overwrite table
-      case (false, false, true, _, _, _) => 
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
+      case (false, false, true, _, _, _, _) => 
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
       // insert overwrite partition
-      case (false, true, false, _, _, true) => 
INSERT_OVERWRITE_OPERATION_OPT_VAL
+      case (false, true, false, _, _, true, _) => 
INSERT_OVERWRITE_OPERATION_OPT_VAL
       // disable dropDuplicate, and provide preCombineKey, use the upsert 
operation for strict and upsert mode.
-      case (false, false, false, false, false, _) if combineBeforeInsert => 
UPSERT_OPERATION_OPT_VAL
+      case (false, false, false, false, false, _, _) if combineBeforeInsert => 
UPSERT_OPERATION_OPT_VAL
       // if table is pk table and has enableBulkInsert use bulk insert for 
non-strict mode.
-      case (true, false, false, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL
+      case (true, false, false, _, true, _, _) => BULK_INSERT_OPERATION_OPT_VAL
+      // if auto record key generation is enabled, use bulk_insert
+      case (_, _, _, _, _,_,true) => BULK_INSERT_OPERATION_OPT_VAL
       // for the rest case, use the insert operation
       case _ => INSERT_OPERATION_OPT_VAL
     }
@@ -190,6 +193,7 @@ trait ProvidesHoodieConfig extends Logging {
       DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
     val dropDuplicate = sparkSession.conf
       
.getOption(INSERT_DROP_DUPS.key).getOrElse(INSERT_DROP_DUPS.defaultValue).toBoolean
+    val autoGenerateRecordKeys : Boolean = 
!combinedOpts.contains(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
 
     val insertMode = 
InsertMode.of(combinedOpts.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key,
       DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue()))
@@ -214,7 +218,7 @@ trait ProvidesHoodieConfig extends Logging {
         // NOTE: Target operation could be overridden by the user, therefore 
if it has been provided as an input
         //       we'd prefer that value over auto-deduced operation. 
Otherwise, we deduce target operation type
         deduceOperation(enableBulkInsert, isOverwritePartition, 
isOverwriteTable, dropDuplicate,
-          isNonStrictMode, isPartitionedTable, combineBeforeInsert, insertMode)
+          isNonStrictMode, isPartitionedTable, combineBeforeInsert, 
insertMode, autoGenerateRecordKeys)
       } else {
         deduceSqlWriteOperation(isOverwritePartition, isOverwriteTable, 
sqlWriteOperation)
       }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
index 3f737c9dac3..89a232b5f99 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
@@ -261,4 +261,20 @@ class TestAutoGenerationOfRecordKeys extends 
HoodieSparkClientTestBase with Scal
     val expectedMsg = s"RecordKey:\t_row_key\tnull"
     assertTrue(getRootCause(e).getMessage.contains(expectedMsg))
   }
+
+  @Test
+  def testWriteToHudiWithoutAnyConfigs(): Unit = {
+    val records = recordsToStrings(dataGen.generateInserts("000", 5)).toList
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+    inputDF.cache
+
+    inputDF.write.format("hudi")
+      .option("hoodie.table.name","hudi_tbl")
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+    val snapshot0 = spark.read.format("hudi").load(basePath)
+    assertEquals(5, snapshot0.count())
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
index 7adbb23dd02..fdf552be655 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
@@ -405,6 +405,52 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test Create Table As Select With Auto record key gen") {
+    withTempDir { tmp =>
+      // Create Non-Partitioned table
+      val tableName1 = generateTableName
+      spark.sql(
+        s"""
+           | create table $tableName1 using hudi
+           | tblproperties(
+           |    type = 'cow'
+           | )
+           | location '${tmp.getCanonicalPath}/$tableName1'
+           | AS
+           | select 1 as id, 'a1' as name, 10 as price, 1000 as ts
+       """.stripMargin)
+
+      assertResult(WriteOperationType.BULK_INSERT) {
+        getLastCommitMetadata(spark, 
s"${tmp.getCanonicalPath}/$tableName1").getOperationType
+      }
+      checkAnswer(s"select id, name, price, ts from $tableName1")(
+        Seq(1, "a1", 10.0, 1000)
+      )
+
+      // Create Partitioned table
+      val tableName2 = generateTableName
+      spark.sql(
+        s"""
+           | create table $tableName2 using hudi
+           | partitioned by (dt)
+           | tblproperties(
+           |    type = 'cow'
+           | )
+           | location '${tmp.getCanonicalPath}/$tableName2'
+           | AS
+           | select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt
+         """.stripMargin
+      )
+
+      assertResult(WriteOperationType.BULK_INSERT) {
+        getLastCommitMetadata(spark, 
s"${tmp.getCanonicalPath}/$tableName2").getOperationType
+      }
+      checkAnswer(s"select id, name, price, dt from $tableName2")(
+        Seq(1, "a1", 10, "2021-04-01")
+      )
+    }
+  }
+
   test("Test Create ro/rt Table In The Right Way") {
     withTempDir { tmp =>
       val parentPath = tmp.getCanonicalPath
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 81cdff27e35..f80906e9d7b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -1565,9 +1565,48 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
         Seq(2, "a2", 20.0, 2000, "2021-01-06"),
         Seq(3, "a3", 30.0, 3000, "2021-01-07")
       )
-
       val df = spark.read.format("hudi").load(tmp.getCanonicalPath)
       assertEquals(3, 
df.select(HoodieRecord.RECORD_KEY_METADATA_FIELD).count())
+      assertResult(WriteOperationType.BULK_INSERT) {
+        getLastCommitMetadata(spark, 
s"${tmp.getCanonicalPath}").getOperationType
+      }
+    }
+  }
+
+  test("Test Insert Into with auto generate record keys with precombine ") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // Create a partitioned table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  dt string,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | partitioned by (dt)
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  type = 'cow',
+           |  preCombineField = 'price'
+           | )
+       """.stripMargin)
+
+      spark.sql(
+        s"""
+           | insert into $tableName values
+           | (1, 'a1', 10, 1000, "2021-01-05"),
+           | (2, 'a2', 20, 2000, "2021-01-06"),
+           | (3, 'a3', 30, 3000, "2021-01-07")
+              """.stripMargin)
+
+      checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+        Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+        Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+        Seq(3, "a3", 30.0, 3000, "2021-01-07")
+      )
     }
   }
 

Reply via email to