Alexey Kudinkin created HUDI-4861:
-------------------------------------

             Summary: Relax MERGE INTO restrictions to permit casting of the 
matching condition
                 Key: HUDI-4861
                 URL: https://issues.apache.org/jira/browse/HUDI-4861
             Project: Apache Hudi
          Issue Type: Bug
    Affects Versions: 0.12.0
            Reporter: Alexey Kudinkin
            Assignee: Alexey Kudinkin
             Fix For: 0.13.0


Reported by user:

[https://github.com/apache/hudi/issues/6626]

 

Following code

 
{code:java}
     target_df = spark.read.format("hudi").load(basePath)
    print("###################################")
    print(target_df.printSchema())
    # # target_df.show()
    target_datatype_map = {}
    for name, dtype in target_df.dtypes:
        target_datatype_map[name] = dtype
    print(str(target_datatype_map))
    print("###################################")

    for col in columns:
        if has_column(deflateDf, col):
            deflateDf = deflateDf.withColumn(col, F.col(col))
        else:
            deflateDf = deflateDf.withColumn(col, F.lit(None))
    deflateDf.createOrReplaceTempView("deflate_table")
    create_sql = "create table RESULTDATA using hudi location 
'/tmp/RESULTDATA_mor'"
    spark.sql(create_sql)
    
    merge_sql = """
    merge into RESULTDATA as target
        using (
            select * from deflate_table as deflate
        )
        on target._context_id_ = deflate._context_id_ and target.id = deflate.id
        when matched
        then update set
        target.CREATED = cast(if(array_contains(deflate.changed_cols, 
'CREATED'), deflate.CREATED, target.CREATED) as timestamp),target.CREATEDBY = 
cast(if(array_contains(deflate.changed_cols, 'CREATEDBY'), deflate.CREATEDBY, 
target.CREATEDBY) as string),target.DELETED = 
cast(if(array_contains(deflate.changed_cols, 'DELETED'), deflate.DELETED, 
target.DELETED) as timestamp),target.DELETEDBY = 
cast(if(array_contains(deflate.changed_cols, 'DELETEDBY'), deflate.DELETEDBY, 
target.DELETEDBY) as string),target.EXPIRATIONDATE = 
cast(if(array_contains(deflate.changed_cols, 'EXPIRATIONDATE'), 
deflate.EXPIRATIONDATE, target.EXPIRATIONDATE) as timestamp),target.ID = 
cast(if(array_contains(deflate.changed_cols, 'ID'), deflate.ID, target.ID) as 
decimal(12,0)),target.KEY = cast(if(array_contains(deflate.changed_cols, 
'KEY'), deflate.KEY, target.KEY) as string),target.LASTMODIFIED = 
cast(if(array_contains(deflate.changed_cols, 'LASTMODIFIED'), 
deflate.LASTMODIFIED, target.LASTMODIFIED) as timestamp),target.LASTMODIFIEDBY 
= cast(if(array_contains(deflate.changed_cols, 'LASTMODIFIEDBY'), 
deflate.LASTMODIFIEDBY, target.LASTMODIFIEDBY) as string),target.ORDERING = 
cast(if(array_contains(deflate.changed_cols, 'ORDERING'), deflate.ORDERING, 
target.ORDERING) as decimal(12,0)),target.RESULTID = 
cast(if(array_contains(deflate.changed_cols, 'RESULTID'), deflate.RESULTID, 
target.RESULTID) as decimal(12,0)),target.REPORTINGPERIODTYPE = 
cast(if(array_contains(deflate.changed_cols, 'REPORTINGPERIODTYPE'), 
deflate.REPORTINGPERIODTYPE, target.REPORTINGPERIODTYPE) as 
string),target.RESULTDATE = cast(if(array_contains(deflate.changed_cols, 
'RESULTDATE'), deflate.RESULTDATE, target.RESULTDATE) as 
timestamp),target.SATISFYINGNUMERATOR = 
cast(if(array_contains(deflate.changed_cols, 'SATISFYINGNUMERATOR'), 
deflate.SATISFYINGNUMERATOR, target.SATISFYINGNUMERATOR) as 
decimal(12,0)),target.VALUE = cast(if(array_contains(deflate.changed_cols, 
'VALUE'), deflate.VALUE, target.VALUE) as string),target._ETL_RUN_ID_ = 
cast(if(array_contains(deflate.changed_cols, '_ETL_RUN_ID_'), 
deflate._ETL_RUN_ID_, target._ETL_RUN_ID_) as 
decimal(38,0)),target._ETL_MODIFIED_ = 
cast(if(array_contains(deflate.changed_cols, '_ETL_MODIFIED_'), 
deflate._ETL_MODIFIED_, target._ETL_MODIFIED_) as timestamp),target._EXTRACTED_ 
= cast(if(array_contains(deflate.changed_cols, '_EXTRACTED_'), 
deflate._EXTRACTED_, target._EXTRACTED_) as 
timestamp),target._SOURCE_EXTRACTED_ = 
cast(if(array_contains(deflate.changed_cols, '_SOURCE_EXTRACTED_'), 
deflate._SOURCE_EXTRACTED_, target._SOURCE_EXTRACTED_) as 
timestamp),target._LAST_MODIFIED_SEQ_ = 
cast(if(array_contains(deflate.changed_cols, '_LAST_MODIFIED_SEQ_'), 
deflate._LAST_MODIFIED_SEQ_, target._LAST_MODIFIED_SEQ_) as 
decimal(38,0)),target._SCHEMA_CLASS_ = 
cast(if(array_contains(deflate.changed_cols, '_SCHEMA_CLASS_'), 
deflate._SCHEMA_CLASS_, target._SCHEMA_CLASS_) as string),target._CONTEXT_ID_ = 
cast(if(array_contains(deflate.changed_cols, '_CONTEXT_ID_'), 
deflate._CONTEXT_ID_, target._CONTEXT_ID_) as 
decimal(12,0)),target._IS_DELETED_ = 
cast(if(array_contains(deflate.changed_cols, '_IS_DELETED_'), 
deflate._IS_DELETED_, target._IS_DELETED_) as boolean)
        when not matched
        then insert
        
(CREATED,CREATEDBY,DELETED,DELETEDBY,EXPIRATIONDATE,ID,KEY,LASTMODIFIED,LASTMODIFIEDBY,ORDERING,RESULTID,REPORTINGPERIODTYPE,RESULTDATE,SATISFYINGNUMERATOR,VALUE,_ETL_RUN_ID_,_ETL_MODIFIED_,_EXTRACTED_,_SOURCE_EXTRACTED_,_LAST_MODIFIED_SEQ_,_SCHEMA_CLASS_,_CONTEXT_ID_,_IS_DELETED_)
 values (cast(deflate.CREATED as timestamp),cast(deflate.CREATEDBY as 
string),cast(deflate.DELETED as timestamp),cast(deflate.DELETEDBY as 
string),cast(deflate.EXPIRATIONDATE as timestamp),cast(deflate.ID as 
decimal(12,0)),cast(deflate.KEY as string),cast(deflate.LASTMODIFIED as 
timestamp),cast(deflate.LASTMODIFIEDBY as string),cast(deflate.ORDERING as 
decimal(12,0)),cast(deflate.RESULTID as 
decimal(12,0)),cast(deflate.REPORTINGPERIODTYPE as 
string),cast(deflate.RESULTDATE as timestamp),cast(deflate.SATISFYINGNUMERATOR 
as decimal(12,0)),cast(deflate.VALUE as string),cast(deflate._ETL_RUN_ID_ as 
decimal(38,0)),cast(deflate._ETL_MODIFIED_ as 
timestamp),cast(deflate._EXTRACTED_ as 
timestamp),cast(deflate._SOURCE_EXTRACTED_ as 
timestamp),cast(deflate._LAST_MODIFIED_SEQ_ as 
decimal(38,0)),cast(deflate._SCHEMA_CLASS_ as string),cast(deflate._CONTEXT_ID_ 
as decimal(12,0)),cast(deflate._IS_DELETED_ as boolean))
        """
    spark.sql(merge_sql) {code}
 

 

Results in the exception being thrown:

 
{code:java}
/09/07 18:47:12 INFO HoodieTableMetaClient: Finished Loading Table of type 
MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from file:///tmp/RESULTDATA_mor
22/09/07 18:47:12 INFO HoodieActiveTimeline: Loaded instants upto : 
Option{val=[20220907150126010__deltacommit__COMPLETED]}
Traceback (most recent call last):
  File "/Users/parunkarthick/cdc-poc/main.py", line 971, in <module>
    process_table(deflate_df, tableName, table_cols[tableNames[0]], 
concurrent_write_enabled, delete_insert_enabled)
  File "/Users/parunkarthick/cdc-poc/main.py", line 767, in process_table
    merge_into_hudi(table_name, df, table_cols)
  File "/Users/parunkarthick/cdc-poc/main.py", line 599, in merge_into_hudi
    target_rows = spark.sql(sql)
  File 
"/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/sql/session.py",
 line 723, in sql
  File 
"/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
 line 1304, in __call__
  File 
"/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/sql/utils.py",
 line 117, in deco
pyspark.sql.utils.AnalysisException: Invalidate Merge-On condition: 
(CAST(target.`id` AS DECIMAL(20,0)) = CAST(CAST(deflate.`id` AS DECIMAL(20,0)) 
AS DECIMAL(20,0))).The validate condition should be 'targetColumn = 
sourceColumnExpression', e.g. t.id = s.id and t.dt = from_unixtime(s.ts) {code}
 

This occurs due to the fact that current impl of 
{{MergeIntoHoodieTableCommand}} restricts target table's primary key to be just 
an {{{}AttributeReference{}}}, which in this case is wrapped into a {{Cast}}

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to