This is an automated email from the ASF dual-hosted git repository.
wombatukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 96e234042b7 [HUDI-8430] Remove redundant configs in TestInsertTable
(#12202)
96e234042b7 is described below
commit 96e234042b79c1e61f89176282ba5a9f9ea62ad4
Author: Y Ethan Guo <[email protected]>
AuthorDate: Mon Nov 4 17:52:07 2024 -0800
[HUDI-8430] Remove redundant configs in TestInsertTable (#12202)
---
.../spark/sql/hudi/dml/TestInsertTable.scala | 148 ++++++++++-----------
1 file changed, 70 insertions(+), 78 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index d92b10b31e7..a731f9b8ebe 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -34,7 +34,6 @@ import
org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
import org.apache.spark.scheduler.{SparkListener, SparkListenerStageSubmitted}
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
-import
org.apache.spark.sql.hudi.command.HoodieSparkValidateDuplicateKeyRecordMerger
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
import
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.getLastCommitMetadata
import org.junit.jupiter.api.Assertions.assertEquals
@@ -351,84 +350,77 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
}
- // TODO(HUDI-8430): revisit the explicit config setting of
recordMergeStrategyId
test("Test Insert Into Non Partitioned Table") {
- withRecordType(Seq(HoodieRecordType.AVRO, HoodieRecordType.SPARK),
Map(HoodieRecordType.SPARK ->
- // SparkMerger should use "HoodieSparkValidateDuplicateKeyRecordMerger"
- // with "hoodie.sql.insert.mode=strict"
- Map(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key ->
-
classOf[HoodieSparkValidateDuplicateKeyRecordMerger].getName)))(withTempDir {
tmp =>
- val tableName = generateTableName
- spark.sql(s"set hoodie.sql.insert.mode=strict")
- // Create none partitioned cow table
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | location '${tmp.getCanonicalPath}/$tableName'
- | tblproperties (
- | type = 'cow',
- | primaryKey = 'id',
- | preCombineField = 'ts',
- | recordMergeStrategyId =
'${HoodieSparkValidateDuplicateKeyRecordMerger.STRATEGY_ID}'
- | )
- """.stripMargin)
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- checkAnswer(s"select id, name, price, ts from $tableName")(
- Seq(1, "a1", 10.0, 1000)
- )
- spark.sql(s"insert into $tableName select 2, 'a2', 12, 1000")
- checkAnswer(s"select id, name, price, ts from $tableName")(
- Seq(1, "a1", 10.0, 1000),
- Seq(2, "a2", 12.0, 1000)
- )
-
- assertThrows[HoodieDuplicateKeyException] {
- try {
- spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
- } catch {
- case e: Exception =>
- var root: Throwable = e
- while (root.getCause != null) {
- root = root.getCause
- }
- throw root
- }
- }
-
- // Create table with dropDup is true
- val tableName2 = generateTableName
- spark.sql("set hoodie.datasource.write.insert.drop.duplicates = true")
- spark.sql(
- s"""
- |create table $tableName2 (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | location '${tmp.getCanonicalPath}/$tableName2'
- | tblproperties (
- | type = 'mor',
- | primaryKey = 'id',
- | preCombineField = 'ts',
- | recordMergeStrategyId =
'${HoodieSparkValidateDuplicateKeyRecordMerger.STRATEGY_ID}'
- | )
- """.stripMargin)
- spark.sql(s"insert into $tableName2 select 1, 'a1', 10, 1000")
- // This record will be drop when dropDup is true
- spark.sql(s"insert into $tableName2 select 1, 'a1', 12, 1000")
- checkAnswer(s"select id, name, price, ts from $tableName2")(
- Seq(1, "a1", 10.0, 1000)
- )
- // disable this config to avoid affect other test in this class.
- spark.sql("set hoodie.datasource.write.insert.drop.duplicates = false")
- spark.sql(s"set hoodie.sql.insert.mode=upsert")
- })
+ withRecordType(Seq(HoodieRecordType.AVRO,
HoodieRecordType.SPARK))(withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(s"set hoodie.sql.insert.mode=strict")
+ // Create none partitioned cow table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | type = 'cow',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 10.0, 1000)
+ )
+ spark.sql(s"insert into $tableName select 2, 'a2', 12, 1000")
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 10.0, 1000),
+ Seq(2, "a2", 12.0, 1000)
+ )
+
+ assertThrows[HoodieDuplicateKeyException] {
+ try {
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+ } catch {
+ case e: Exception =>
+ var root: Throwable = e
+ while (root.getCause != null) {
+ root = root.getCause
+ }
+ throw root
+ }
+ }
+
+ // Create table with dropDup is true
+ val tableName2 = generateTableName
+ spark.sql("set hoodie.datasource.write.insert.drop.duplicates = true")
+ spark.sql(
+ s"""
+ |create table $tableName2 (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName2'
+ | tblproperties (
+ | type = 'mor',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ spark.sql(s"insert into $tableName2 select 1, 'a1', 10, 1000")
+ // This record will be drop when dropDup is true
+ spark.sql(s"insert into $tableName2 select 1, 'a1', 12, 1000")
+ checkAnswer(s"select id, name, price, ts from $tableName2")(
+ Seq(1, "a1", 10.0, 1000)
+ )
+ // disable this config to avoid affect other test in this class.
+ spark.sql("set hoodie.datasource.write.insert.drop.duplicates = false")
+ spark.sql(s"set hoodie.sql.insert.mode=upsert")
+ })
}
test("Test Insert Into None Partitioned Table strict mode with no
preCombineField") {