This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a60522a6fab [HUDI-5514] Improving usability/performance with out of
box default for append only use-cases (#8697)
a60522a6fab is described below
commit a60522a6fab074100164405c463d59aff7c4f5bd
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Aug 4 02:57:30 2023 -0400
[HUDI-5514] Improving usability/performance with out of box default for
append only use-cases (#8697)
We are recently added auto record key generation support mainly to cater to
append only use-cases. So, as part of the work stream, this is a follow up
patch. This patch makes bulk_insert as the default operation type for append
only use-cases. For regular pipelines, "upsert" will be the default operation.
With this patch, users can now do the following w/ hudi.
a. non-partitioned:
df.write.format("hudi").option("hoodie.table.name","hudi_tbl").save(path) ->
will use bulk_insert.
b. partitioned dataset:
df.write.partitionBy(colA).format("hudi").option("hoodie.table.name","hudi_tbl").save(path)
-> will use bulk_insert.
c. multi-field partitioning: df.write.partitionBy(colA,
colB).format("hudi").option("hoodie.table.name","hudi_tbl").save(path) -> will
use bulk_insert.
If any of the config is set among (record key, operation), default
operation is chosen to be "upsert" which has been the behavior so far.
---
.../apache/hudi/AutoRecordKeyGenerationUtils.scala | 9 ++++-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 41 ++++++++++++-------
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 26 ++++++------
.../TestAutoGenerationOfRecordKeys.scala | 16 ++++++++
.../apache/spark/sql/hudi/TestCreateTable.scala | 46 ++++++++++++++++++++++
.../apache/spark/sql/hudi/TestInsertTable.scala | 41 ++++++++++++++++++-
6 files changed, 152 insertions(+), 27 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
index 9b3a10a3f62..ca679acc799 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AutoRecordKeyGenerationUtils.scala
@@ -19,14 +19,17 @@
package org.apache.hudi
-import org.apache.hudi.DataSourceWriteOptions.INSERT_DROP_DUPS
+import org.apache.hudi.DataSourceWriteOptions.{INSERT_DROP_DUPS,
PRECOMBINE_FIELD}
+import org.apache.hudi.HoodieSparkSqlWriter.getClass
import org.apache.hudi.common.config.HoodieConfig
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieKeyGeneratorException
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.slf4j.LoggerFactory
object AutoRecordKeyGenerationUtils {
+ private val log = LoggerFactory.getLogger(getClass)
def mayBeValidateParamsForAutoGenerationOfRecordKeys(parameters: Map[String,
String], hoodieConfig: HoodieConfig): Unit = {
val autoGenerateRecordKeys =
!parameters.contains(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()) // if
record key is not configured,
@@ -46,5 +49,9 @@ object AutoRecordKeyGenerationUtils {
throw new HoodieKeyGeneratorException("Disabling " +
HoodieTableConfig.POPULATE_META_FIELDS.key() + " is not supported with auto
generation of record keys")
}
}
+
+ if (hoodieConfig.contains(PRECOMBINE_FIELD.key())) {
+ log.warn("Precombine field " +
hoodieConfig.getString(PRECOMBINE_FIELD.key()) + " will be ignored with auto
record key generation enabled")
+ }
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 07b16e1e47d..c7d1a4979bb 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -157,20 +157,7 @@ object HoodieSparkSqlWriter {
case _ => throw new HoodieException("hoodie only support
org.apache.spark.serializer.KryoSerializer as spark.serializer")
}
val tableType = HoodieTableType.valueOf(hoodieConfig.getString(TABLE_TYPE))
- var operation =
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
- // TODO clean up
- // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS
is true
- // Auto-correct the operation to "insert" if OPERATION is set to "upsert"
wrongly
- // or not set (in which case it will be set as "upsert" by
parametersWithWriteDefaults()) .
- if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
- operation == WriteOperationType.UPSERT) {
-
- log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
- s"when $INSERT_DROP_DUPS is set to be true, " +
- s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
-
- operation = WriteOperationType.INSERT
- }
+ val operation = deduceOperation(hoodieConfig, paramsWithoutDefaults)
val jsc = new JavaSparkContext(sparkContext)
if (asyncCompactionTriggerFn.isDefined) {
@@ -428,6 +415,32 @@ object HoodieSparkSqlWriter {
}
}
+ def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults :
Map[String, String]): WriteOperationType = {
+ var operation =
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
+ // TODO clean up
+ // It does not make sense to allow upsert() operation if INSERT_DROP_DUPS
is true
+ // Auto-correct the operation to "insert" if OPERATION is set to "upsert"
wrongly
+ // or not set (in which case it will be set as "upsert" by
parametersWithWriteDefaults()) .
+ if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
+ operation == WriteOperationType.UPSERT) {
+
+ log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
+ s"when $INSERT_DROP_DUPS is set to be true, " +
+ s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
+
+ operation = WriteOperationType.INSERT
+ operation
+ } else {
+ // if no record key, we should treat it as append only workload and make
bulk_insert as operation type.
+ if
(!paramsWithoutDefaults.containsKey(DataSourceWriteOptions.RECORDKEY_FIELD.key())
+ && !paramsWithoutDefaults.containsKey(OPERATION.key())) {
+ log.warn(s"Choosing BULK_INSERT as the operation type since auto
record key generation is applicable")
+ operation = WriteOperationType.BULK_INSERT
+ }
+ operation
+ }
+ }
+
/**
* Deduces writer's schema based on
* <ul>
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 7e2cd9ef308..c66dcc19549 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -28,6 +28,7 @@ import org.apache.hudi.config.{HoodieIndexConfig,
HoodieInternalConfig, HoodieWr
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder,
MultiPartKeysValueExtractor}
import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.sql.InsertMode
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.spark.internal.Logging
@@ -122,27 +123,29 @@ trait ProvidesHoodieConfig extends Logging {
*/
private def deduceOperation(enableBulkInsert: Boolean, isOverwritePartition:
Boolean, isOverwriteTable: Boolean,
dropDuplicate: Boolean, isNonStrictMode:
Boolean, isPartitionedTable: Boolean,
- combineBeforeInsert: Boolean, insertMode:
InsertMode): String = {
- (enableBulkInsert, isOverwritePartition, isOverwriteTable, dropDuplicate,
isNonStrictMode, isPartitionedTable) match {
- case (true, _, _, _, false, _) =>
+ combineBeforeInsert: Boolean, insertMode:
InsertMode, autoGenerateRecordKeys: Boolean): String = {
+ (enableBulkInsert, isOverwritePartition, isOverwriteTable, dropDuplicate,
isNonStrictMode, isPartitionedTable, autoGenerateRecordKeys) match {
+ case (true, _, _, _, false, _, _) =>
throw new IllegalArgumentException(s"Table with primaryKey can not use
bulk insert in ${insertMode.value()} mode.")
- case (true, _, _, true, _, _) =>
+ case (true, _, _, true, _, _, _) =>
throw new IllegalArgumentException(s"Bulk insert cannot support drop
duplication." +
s" Please disable $INSERT_DROP_DUPS and try again.")
// Bulk insert with overwrite table
- case (true, false, true, _, _, _) =>
+ case (true, false, true, _, _, _, _) =>
BULK_INSERT_OPERATION_OPT_VAL
// Bulk insert with overwrite table partition
- case (true, true, false, _, _, true) =>
+ case (true, true, false, _, _, true, _) =>
BULK_INSERT_OPERATION_OPT_VAL
// insert overwrite table
- case (false, false, true, _, _, _) =>
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
+ case (false, false, true, _, _, _, _) =>
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
// insert overwrite partition
- case (false, true, false, _, _, true) =>
INSERT_OVERWRITE_OPERATION_OPT_VAL
+ case (false, true, false, _, _, true, _) =>
INSERT_OVERWRITE_OPERATION_OPT_VAL
// disable dropDuplicate, and provide preCombineKey, use the upsert
operation for strict and upsert mode.
- case (false, false, false, false, false, _) if combineBeforeInsert =>
UPSERT_OPERATION_OPT_VAL
+ case (false, false, false, false, false, _, _) if combineBeforeInsert =>
UPSERT_OPERATION_OPT_VAL
// if table is pk table and has enableBulkInsert use bulk insert for
non-strict mode.
- case (true, false, false, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL
+ case (true, false, false, _, true, _, _) => BULK_INSERT_OPERATION_OPT_VAL
+ // if auto record key generation is enabled, use bulk_insert
+ case (_, _, _, _, _,_,true) => BULK_INSERT_OPERATION_OPT_VAL
// for the rest case, use the insert operation
case _ => INSERT_OPERATION_OPT_VAL
}
@@ -190,6 +193,7 @@ trait ProvidesHoodieConfig extends Logging {
DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
val dropDuplicate = sparkSession.conf
.getOption(INSERT_DROP_DUPS.key).getOrElse(INSERT_DROP_DUPS.defaultValue).toBoolean
+ val autoGenerateRecordKeys : Boolean =
!combinedOpts.contains(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
val insertMode =
InsertMode.of(combinedOpts.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key,
DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue()))
@@ -214,7 +218,7 @@ trait ProvidesHoodieConfig extends Logging {
// NOTE: Target operation could be overridden by the user, therefore
if it has been provided as an input
// we'd prefer that value over auto-deduced operation.
Otherwise, we deduce target operation type
deduceOperation(enableBulkInsert, isOverwritePartition,
isOverwriteTable, dropDuplicate,
- isNonStrictMode, isPartitionedTable, combineBeforeInsert, insertMode)
+ isNonStrictMode, isPartitionedTable, combineBeforeInsert,
insertMode, autoGenerateRecordKeys)
} else {
deduceSqlWriteOperation(isOverwritePartition, isOverwriteTable,
sqlWriteOperation)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
index 3f737c9dac3..89a232b5f99 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
@@ -261,4 +261,20 @@ class TestAutoGenerationOfRecordKeys extends
HoodieSparkClientTestBase with Scal
val expectedMsg = s"RecordKey:\t_row_key\tnull"
assertTrue(getRootCause(e).getMessage.contains(expectedMsg))
}
+
+ @Test
+ def testWriteToHudiWithoutAnyConfigs(): Unit = {
+ val records = recordsToStrings(dataGen.generateInserts("000", 5)).toList
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+ inputDF.cache
+
+ inputDF.write.format("hudi")
+ .option("hoodie.table.name","hudi_tbl")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+ val snapshot0 = spark.read.format("hudi").load(basePath)
+ assertEquals(5, snapshot0.count())
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
index 7adbb23dd02..fdf552be655 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
@@ -405,6 +405,52 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
}
}
+ test("Test Create Table As Select With Auto record key gen") {
+ withTempDir { tmp =>
+ // Create Non-Partitioned table
+ val tableName1 = generateTableName
+ spark.sql(
+ s"""
+ | create table $tableName1 using hudi
+ | tblproperties(
+ | type = 'cow'
+ | )
+ | location '${tmp.getCanonicalPath}/$tableName1'
+ | AS
+ | select 1 as id, 'a1' as name, 10 as price, 1000 as ts
+ """.stripMargin)
+
+ assertResult(WriteOperationType.BULK_INSERT) {
+ getLastCommitMetadata(spark,
s"${tmp.getCanonicalPath}/$tableName1").getOperationType
+ }
+ checkAnswer(s"select id, name, price, ts from $tableName1")(
+ Seq(1, "a1", 10.0, 1000)
+ )
+
+ // Create Partitioned table
+ val tableName2 = generateTableName
+ spark.sql(
+ s"""
+ | create table $tableName2 using hudi
+ | partitioned by (dt)
+ | tblproperties(
+ | type = 'cow'
+ | )
+ | location '${tmp.getCanonicalPath}/$tableName2'
+ | AS
+ | select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt
+ """.stripMargin
+ )
+
+ assertResult(WriteOperationType.BULK_INSERT) {
+ getLastCommitMetadata(spark,
s"${tmp.getCanonicalPath}/$tableName2").getOperationType
+ }
+ checkAnswer(s"select id, name, price, dt from $tableName2")(
+ Seq(1, "a1", 10, "2021-04-01")
+ )
+ }
+ }
+
test("Test Create ro/rt Table In The Right Way") {
withTempDir { tmp =>
val parentPath = tmp.getCanonicalPath
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 81cdff27e35..f80906e9d7b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -1565,9 +1565,48 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
Seq(2, "a2", 20.0, 2000, "2021-01-06"),
Seq(3, "a3", 30.0, 3000, "2021-01-07")
)
-
val df = spark.read.format("hudi").load(tmp.getCanonicalPath)
assertEquals(3,
df.select(HoodieRecord.RECORD_KEY_METADATA_FIELD).count())
+ assertResult(WriteOperationType.BULK_INSERT) {
+ getLastCommitMetadata(spark,
s"${tmp.getCanonicalPath}").getOperationType
+ }
+ }
+ }
+
+ test("Test Insert Into with auto generate record keys with precombine ") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // Create a partitioned table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | dt string,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | type = 'cow',
+ | preCombineField = 'price'
+ | )
+ """.stripMargin)
+
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1, 'a1', 10, 1000, "2021-01-05"),
+ | (2, 'a2', 20, 2000, "2021-01-06"),
+ | (3, 'a3', 30, 3000, "2021-01-07")
+ """.stripMargin)
+
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3", 30.0, 3000, "2021-01-07")
+ )
}
}