[ 
https://issues.apache.org/jira/browse/HUDI-8628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davis Zhang updated HUDI-8628:
------------------------------
    Description: 
spark.sql(s"set 
${HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
spark.sql(s"set ${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} 
= true")
spark.sql(s"set ${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key} = 
$logDataBlockFormat")
spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} = false")

spark.sql(
s"""
|create table $tableName (|
|id int,|
|name string,|
|price long,|
|ts long,|
|description string|
|) using hudi|
|tblproperties(|
|type ='$tableType',|
|primaryKey = 'id',|
|preCombineField = 'ts'|
|)|
|location '$basePath'
""".stripMargin)
spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1')," +
"(2, 'a2', 20, 1200, 'a2: desc2'), (3, 'a3', 30.0, 1250, 'a3: desc3')")|

 

 

Merge Into:

// Partial updates using MERGE INTO statement with changed fields: "price" and 
"_ts"
spark.sql(
s"""
|merge into $tableName t0|
|using ( select 1 as id, 'a1' as name, 12 as price, 1001 as _ts|
|union select 3 as id, 'a3' as name, 25 as price, 1260 as _ts) s0|
|on t0.id = s0.id|
|when matched then update set price = s0.price, ts = s0._ts|
|""".stripMargin)|

 

The schema for this merge into command when we reach 
HoodieSparkSqlWriter.deduceWriterSchema is given below. 

i.e. 

val writerSchema = HoodieSchemaUtils.deduceWriterSchema(sourceSchema, 
latestTableSchemaOpt, internalSchemaOpt, parameters)

 

!image-2024-12-02-03-58-48-178.png!

 

the merge into command only instructs to update price and _ts right? So, why 
other fields are also picked up from source(for eg name). 

You can check out the test in TestPartialUpdateForMergeInto.Test partial update 
with MOR and Avro log format

 

Note: This is partial update support w/ MergeInto btw, not a regular MergeInto.

 

 

  was:
spark.sql(s"set 
${HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
spark.sql(s"set ${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} 
= true")
spark.sql(s"set ${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key} = 
$logDataBlockFormat")
spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} = false")

spark.sql(
s"""
|create table $tableName (|
|id int,|
|name string,|
|price long,|
|ts long,|
|description string|
|) using hudi|
|tblproperties(|
|type ='$tableType',|
|primaryKey = 'id',|
|preCombineField = '_ts'|
|)|
|location '$basePath'
""".stripMargin)
spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1')," +
"(2, 'a2', 20, 1200, 'a2: desc2'), (3, 'a3', 30.0, 1250, 'a3: desc3')")|

 

 

Merge Into:

// Partial updates using MERGE INTO statement with changed fields: "price" and 
"_ts"
spark.sql(
s"""
|merge into $tableName t0|
|using ( select 1 as id, 'a1' as name, 12 as price, 1001 as _ts|
|union select 3 as id, 'a3' as name, 25 as price, 1260 as _ts) s0|
|on t0.id = s0.id|
|when matched then update set price = s0.price, ts = s0._ts|
|""".stripMargin)|

 

The schema for this merge into command when we reach 
HoodieSparkSqlWriter.deduceWriterSchema is given below. 

i.e. 

val writerSchema = HoodieSchemaUtils.deduceWriterSchema(sourceSchema, 
latestTableSchemaOpt, internalSchemaOpt, parameters)

 

!image-2024-12-02-03-58-48-178.png!

 

the merge into command only instructs to update price and _ts right? So, why 
other fields are also picked up from source(for eg name). 

You can check out the test in TestPartialUpdateForMergeInto.Test partial update 
with MOR and Avro log format

 

Note: This is partial update support w/ MergeInto btw, not a regular MergeInto.

 

 


> Merge Into is pulling in additional fields which are not set as per the 
> condition 
> ----------------------------------------------------------------------------------
>
>                 Key: HUDI-8628
>                 URL: https://issues.apache.org/jira/browse/HUDI-8628
>             Project: Apache Hudi
>          Issue Type: Sub-task
>          Components: spark-sql
>            Reporter: sivabalan narayanan
>            Assignee: Y Ethan Guo
>            Priority: Blocker
>             Fix For: 1.0.1
>
>         Attachments: image-2024-12-02-03-58-48-178.png
>
>
> spark.sql(s"set 
> ${HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
> spark.sql(s"set 
> ${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
> spark.sql(s"set ${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key} = 
> $logDataBlockFormat")
> spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} = false")
> spark.sql(
> s"""
> |create table $tableName (|
> |id int,|
> |name string,|
> |price long,|
> |ts long,|
> |description string|
> |) using hudi|
> |tblproperties(|
> |type ='$tableType',|
> |primaryKey = 'id',|
> |preCombineField = 'ts'|
> |)|
> |location '$basePath'
> """.stripMargin)
> spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, 'a1: desc1')," +
> "(2, 'a2', 20, 1200, 'a2: desc2'), (3, 'a3', 30.0, 1250, 'a3: desc3')")|
>  
>  
> Merge Into:
> // Partial updates using MERGE INTO statement with changed fields: "price" 
> and "_ts"
> spark.sql(
> s"""
> |merge into $tableName t0|
> |using ( select 1 as id, 'a1' as name, 12 as price, 1001 as _ts|
> |union select 3 as id, 'a3' as name, 25 as price, 1260 as _ts) s0|
> |on t0.id = s0.id|
> |when matched then update set price = s0.price, ts = s0._ts|
> |""".stripMargin)|
>  
> The schema for this merge into command when we reach 
> HoodieSparkSqlWriter.deduceWriterSchema is given below. 
> i.e. 
> val writerSchema = HoodieSchemaUtils.deduceWriterSchema(sourceSchema, 
> latestTableSchemaOpt, internalSchemaOpt, parameters)
>  
> !image-2024-12-02-03-58-48-178.png!
>  
> the merge into command only instructs to update price and _ts right? So, why 
> other fields are also picked up from source(for eg name). 
> You can check out the test in TestPartialUpdateForMergeInto.Test partial 
> update with MOR and Avro log format
>  
> Note: This is partial update support w/ MergeInto btw, not a regular 
> MergeInto.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to