nsivabalan commented on code in PR #13699:
URL: https://github.com/apache/hudi/pull/13699#discussion_r2279120259
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable2.scala:
##########
@@ -838,72 +839,72 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
}
}
- test("Test only insert for source table in dup key without preCombineField")
{
- spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} =
${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.defaultValue()}")
- Seq("cow", "mor").foreach {
- tableType => {
- withTempDir { tmp =>
- val tableName = generateTableName
- spark.sql(
- s"""
- | create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts int,
- | dt string
- | ) using hudi
- | tblproperties (
- | type = '$tableType',
- | primaryKey = 'id'
- | )
- | partitioned by(dt)
- | location '${tmp.getCanonicalPath}'
- """.stripMargin)
- // append records to small file is use update bucket, set this conf
use concat handler
- spark.sql("set hoodie.merge.allow.duplicate.on.inserts = true")
-
- // Insert data without matched condition
- spark.sql(
- s"""
- | merge into $tableName as t0
- | using (
- | select 1 as id, 'a1' as name, 10.1 as price, 1000 as ts,
'2021-03-21' as dt
- | union all
- | select 1 as id, 'a2' as name, 10.2 as price, 1002 as ts,
'2021-03-21' as dt
- | ) as s0
- | on t0.id = s0.id
- | when not matched then insert *
- """.stripMargin
- )
- checkAnswer(s"select id, name, price, ts, dt from $tableName")(
- Seq(1, "a1", 10.1, 1000, "2021-03-21"),
- Seq(1, "a2", 10.2, 1002, "2021-03-21")
- )
-
- // Insert data with matched condition
- spark.sql(
- s"""
- | merge into $tableName as t0
- | using (
- | select 3 as id, 'a3' as name, 10.3 as price, 1003 as ts,
'2021-03-21' as dt
- | union all
- | select 1 as id, 'a2' as name, 10.4 as price, 1004 as ts,
'2021-03-21' as dt
- | ) as s0
- | on t0.id = s0.id
- | when matched then update set *
- | when not matched then insert *
- """.stripMargin
- )
- checkAnswer(s"select id, name, price, ts, dt from $tableName")(
- Seq(1, "a2", 10.4, 1004, "2021-03-21"),
- Seq(1, "a2", 10.4, 1004, "2021-03-21"),
- Seq(3, "a3", 10.3, 1003, "2021-03-21")
- )
- }
- }
- }
- }
+// test("Test only insert for source table in dup key without
preCombineField") {
Review Comment:
For mutable streams, I agree that duplicates should not be allowed. but for
`inserts` or immutable workloads, we have concat handle which is expected to
retain duplicates. So, we can't blindly shut off all duplicates.
Its tough to guard this case or in other words it may not be easy to allow
duplicates in this special case.
for eg, w/ small file handling, we could route updates + inserts to an
existing file group, which will exercise the FileGroupReaderBasedMergeHandle
after this patch. So, if the set of inserts have duplicates, we might dedup
now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]