[ 
https://issues.apache.org/jira/browse/FLINK-36043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17873210#comment-17873210
 ] 

Yun Tang edited comment on FLINK-36043 at 8/13/24 1:58 PM:
-----------------------------------------------------------

[~catyee] I think the root cause is the difference of {{sink.getInput}}.

With the {{cast}}, the input of sink is {{StreamPhysicalGroupAggregate}}, while 
with the {{coalesce}} function, the input of sink is 
{{StreamPhysicalCalc.STREAM_PHYSICAL.any.None: 
0.[I,U].[NONE](input=StreamPhysicalGroupAggregate#235,select=FLOOR(col1) AS 
col1, col2, col3_cnt, col4_max)}}, that's why the {{changeLogUpsertKeys}} are 
empty. If we just simply change the code to get the parent (aggregate node), it 
will produce the correct result:
~~~scala
// val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput.getInput(0))
~~~

I think this might be improved to search for the first aggregate input to avoid 
such case, [~lincoln.86xy], [~jark] what do you think?


was (Author: yunta):
[~catyee] I think the root cause is the difference of {{sink.getInput}}.

With the {{cast}}, the input of sink is {{StreamPhysicalGroupAggregate}}, while 
with the {{coalesce}} function, the input of sink is 
{{StreamPhysicalCalc.STREAM_PHYSICAL.any.None: 
0.[I,U].[NONE](input=StreamPhysicalGroupAggregate#235,select=FLOOR(col1) AS 
col1, col2, col3_cnt, col4_max)}}, that's why the {{changeLogUpsertKeys}} are 
empty. If we just simply change the code to get the parent (aggregate node), it 
will produce the correct result:
~~~scala
// val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput.getInput(0))
~~~

I think this might be improved to search for the first aggregate input to avoid 
such case, [~lincoln.86xy] what do you think?

> [SQL] Unexpected SinkMaterializer operator generated when use coalesce 
> function on upsert key fields
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-36043
>                 URL: https://issues.apache.org/jira/browse/FLINK-36043
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.20.0
>            Reporter: Yuan Kui
>            Priority: Major
>         Attachments: image-2024-08-13-16-40-27-929.png
>
>
> As deigned, the SinkMaterializer operator should not be generated when the 
> upsert keys are the same as the primary keys. Example:
> {code:java}
> -- source table
> create table source_table (
>    col1 int,
>    col2 int,
>    col3 int,
>    col4 int
> ) with (
>    'connector' = 'datagen',
>    'rows-per-second'='10'
> );
> -- sink table
> create table sink_table(
>    col1 int,
>    col2 int,
>    col3_cnt bigint,
>    col4_max int,
>    primary key(col1, col2) not enforced
> ) with (
>    'connector' = 'blackhole'
> );
> -- sql
> insert into sink_table
> select
>   col1,
>   col2,
>   count(col3) as col3_cnt,
>   max(col4) as col4_max
> from source_table
> group by col1,col2;{code}
> It works well, and the excution plan has no SinkMaterializer operator:
> {code:java}
> == Optimized Execution Plan ==
> Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2, 
> col3_cnt, col4_max])
> +- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS 
> col3_cnt, MAX(col4) AS col4_max])
>    +- Exchange(distribution=[hash[col1, col2]])
>       +- TableSourceScan(table=[[default_catalog, default_database, 
> source_table]], fields=[col1, col2, col3, col4])  {code}
> however, if we use coalesce function on upsert keys, such as:
> {code:java}
> insert into sink_table
> select
>   -- use coalesce
>   coalesce(col1, 0) as col1, 
>   col2,
>   count(col3) as col3_cnt,
>   max(col4) as col4_max
> from source_table
> group by col1,col2; {code}
> the SinkMaterializer operator will be generated:
> {code:java}
> == Optimized Execution Plan ==
> Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2, 
> col3_cnt, col4_max], upsertMaterialize=[true])
> +- Calc(select=[coalesce(col1, 0) AS col1, col2, col3_cnt, col4_max])
>    +- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS 
> col3_cnt, MAX(col4) AS col4_max])
>       +- Exchange(distribution=[hash[col1, col2]])
>          +- TableSourceScan(table=[[default_catalog, default_database, 
> source_table]], fields=[col1, col2, col3, col4]) {code}
> Changing `coalesce(col1, 0)` to `if(col1 is null, 0, col1)` will meet the 
> same problem.
>  
> The code for determining whether a SinkMaterializer operator should be 
> generated is in 
> `org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.SatisfyUpdateKindTraitVisitor#analyzeUpsertMaterializeStrategy`.
>  If making a point in line 881 like this:
> !image-2024-08-13-16-40-27-929.png!
> I found the changeLogUpsertKeys are empty, which lead to 'whether to generate 
> SinkMaterializer'  always true.
> Is that by design or a bug?
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to