[ https://issues.apache.org/jira/browse/FLINK-36043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17873210#comment-17873210 ]
Yun Tang edited comment on FLINK-36043 at 8/13/24 1:58 PM: ----------------------------------------------------------- [~catyee] I think the root cause is the difference of {{sink.getInput}}. With the {{cast}}, the input of sink is {{StreamPhysicalGroupAggregate}}, while with the {{coalesce}} function, the input of sink is {{StreamPhysicalCalc.STREAM_PHYSICAL.any.None: 0.[I,U].[NONE](input=StreamPhysicalGroupAggregate#235,select=FLOOR(col1) AS col1, col2, col3_cnt, col4_max)}}, that's why the {{changeLogUpsertKeys}} are empty. If we just simply change the code to get the parent (aggregate node), it will produce the correct result: ~~~scala // val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput) val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput.getInput(0)) ~~~ I think this might be improved to search for the first aggregate input to avoid such case, [~lincoln.86xy], [~jark] what do you think? was (Author: yunta): [~catyee] I think the root cause is the difference of {{sink.getInput}}. With the {{cast}}, the input of sink is {{StreamPhysicalGroupAggregate}}, while with the {{coalesce}} function, the input of sink is {{StreamPhysicalCalc.STREAM_PHYSICAL.any.None: 0.[I,U].[NONE](input=StreamPhysicalGroupAggregate#235,select=FLOOR(col1) AS col1, col2, col3_cnt, col4_max)}}, that's why the {{changeLogUpsertKeys}} are empty. If we just simply change the code to get the parent (aggregate node), it will produce the correct result: ~~~scala // val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput) val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput.getInput(0)) ~~~ I think this might be improved to search for the first aggregate input to avoid such case, [~lincoln.86xy] what do you think? > [SQL] Unexpected SinkMaterializer operator generated when use coalesce > function on upsert key fields > ---------------------------------------------------------------------------------------------------- > > Key: FLINK-36043 > URL: https://issues.apache.org/jira/browse/FLINK-36043 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.20.0 > Reporter: Yuan Kui > Priority: Major > Attachments: image-2024-08-13-16-40-27-929.png > > > As deigned, the SinkMaterializer operator should not be generated when the > upsert keys are the same as the primary keys. Example: > {code:java} > -- source table > create table source_table ( > col1 int, > col2 int, > col3 int, > col4 int > ) with ( > 'connector' = 'datagen', > 'rows-per-second'='10' > ); > -- sink table > create table sink_table( > col1 int, > col2 int, > col3_cnt bigint, > col4_max int, > primary key(col1, col2) not enforced > ) with ( > 'connector' = 'blackhole' > ); > -- sql > insert into sink_table > select > col1, > col2, > count(col3) as col3_cnt, > max(col4) as col4_max > from source_table > group by col1,col2;{code} > It works well, and the excution plan has no SinkMaterializer operator: > {code:java} > == Optimized Execution Plan == > Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2, > col3_cnt, col4_max]) > +- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS > col3_cnt, MAX(col4) AS col4_max]) > +- Exchange(distribution=[hash[col1, col2]]) > +- TableSourceScan(table=[[default_catalog, default_database, > source_table]], fields=[col1, col2, col3, col4]) {code} > however, if we use coalesce function on upsert keys, such as: > {code:java} > insert into sink_table > select > -- use coalesce > coalesce(col1, 0) as col1, > col2, > count(col3) as col3_cnt, > max(col4) as col4_max > from source_table > group by col1,col2; {code} > the SinkMaterializer operator will be generated: > {code:java} > == Optimized Execution Plan == > Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2, > col3_cnt, col4_max], upsertMaterialize=[true]) > +- Calc(select=[coalesce(col1, 0) AS col1, col2, col3_cnt, col4_max]) > +- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS > col3_cnt, MAX(col4) AS col4_max]) > +- Exchange(distribution=[hash[col1, col2]]) > +- TableSourceScan(table=[[default_catalog, default_database, > source_table]], fields=[col1, col2, col3, col4]) {code} > Changing `coalesce(col1, 0)` to `if(col1 is null, 0, col1)` will meet the > same problem. > > The code for determining whether a SinkMaterializer operator should be > generated is in > `org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.SatisfyUpdateKindTraitVisitor#analyzeUpsertMaterializeStrategy`. > If making a point in line 881 like this: > !image-2024-08-13-16-40-27-929.png! > I found the changeLogUpsertKeys are empty, which lead to 'whether to generate > SinkMaterializer' always true. > Is that by design or a bug? > > -- This message was sent by Atlassian Jira (v8.20.10#820010)