[ https://issues.apache.org/jira/browse/FLINK-36043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yuan Kui updated FLINK-36043: ----------------------------- Description: As deigned, the SinkMaterializer operator should not be generated when the upsert keys are the same as the primary keys. Example: {code:java} -- source table create table source_table ( col1 int, col2 int, col3 int, col4 int ) with ( 'connector' = 'datagen', 'rows-per-second'='10' ); -- sink table create table sink_table( col1 int, col2 int, col3_cnt bigint, col4_max int, primary key(col1, col2) not enforced ) with ( 'connector' = 'blackhole' ); -- sql insert into sink_table select col1, col2, count(col3) like col3_cnt, max(col4) like col4_max from source_table group by col1,col2;{code} It works well, and the excution plan has no SinkMaterializer operator: {code:java} == Optimized Execution Plan == Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2, col3_cnt, col4_max]) +- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS col3_cnt, MAX(col4) AS col4_max]) +- Exchange(distribution=[hash[col1, col2]]) +- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[col1, col2, col3, col4]) {code} however, if we use coalesce function on upsert keys, such as: {code:java} insert into sink_table select -- use coalesce coalesce(col1, 0) as col1, col2, count(col3) like col3_cnt, max(col4) like col4_max from source_table group by col1,col2; {code} the SinkMaterializer operator will be generated: {code:java} == Optimized Execution Plan == Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2, col3_cnt, col4_max], upsertMaterialize=[true]) +- Calc(select=[coalesce(col1, 0) AS col1, col2, col3_cnt, col4_max]) +- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS col3_cnt, MAX(col4) AS col4_max]) +- Exchange(distribution=[hash[col1, col2]]) +- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[col1, col2, col3, col4]) {code} Changing `coalesce(col1, 0)` to `if(col1 is null, 0, col1)` will meet the same problem. The code for determining whether a SinkMaterializer operator should be generated is in `org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.SatisfyUpdateKindTraitVisitor#analyzeUpsertMaterializeStrategy`. If making a point in line 881 like this: !image-2024-08-13-16-40-27-929.png! I found the changeLogUpsertKeys are empty, which lead to 'whether to generate SinkMaterializer' always true. Is that by designed or a bug? was: As deigned, the SinkMaterializer operator should not be generated when the upsert keys are the same as the primary keys. Example: {code:java} -- source table create table source_table ( col1 int, col2 int, col3 int, col4 int ) with ( 'connector' = 'datagen', 'rows-per-second'='10' ); -- sink table create table sink_table( col1 int, col2 int, col3_cnt bigint, col4_max int, primary key(col1, col2) not enforced ) with ( 'connector' = 'blackhole' ); -- sql insert into sink_table select col1, col2, count(col3) like col3_cnt, max(col4) like col4_max from source_table group by col1,col2;{code} It works well, and the excution plan has no SinkMaterializer operator: {code:java} == Optimized Execution Plan == Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2, col3_cnt, col4_max]) +- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS col3_cnt, MAX(col4) AS col4_max]) +- Exchange(distribution=[hash[col1, col2]]) +- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[col1, col2, col3, col4]) {code} however, if we use coalesce function on upsert keys, such as: {code:java} insert into sink_table select -- use coalesce coalesce(col1, 0) as col1, col2, count(col3) like col3_cnt, max(col4) like col4_max from source_table group by col1,col2; {code} the SinkMaterializer operator will be generated: {code:java} == Optimized Execution Plan == Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2, col3_cnt, col4_max], upsertMaterialize=[true]) +- Calc(select=[coalesce(col1, 0) AS col1, col2, col3_cnt, col4_max]) +- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS col3_cnt, MAX(col4) AS col4_max]) +- Exchange(distribution=[hash[col1, col2]]) +- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[col1, col2, col3, col4]) {code} If changing `coalesce(col1, 0)` to `if(col1 is null, 0, col1)`, meet the same problem. The code for determining whether a SinkMaterializer operator should be generated is in `org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.SatisfyUpdateKindTraitVisitor#analyzeUpsertMaterializeStrategy`. If making a point in line 881 like this: !image-2024-08-13-16-40-27-929.png! I found the changeLogUpsertKeys are empty, which lead to 'whether to generate SinkMaterializer' always true. Is that by designed or a bug? > [SQL] Unexpected SinkMaterializer operator generated when use coalesce > function on upsert key fields > ---------------------------------------------------------------------------------------------------- > > Key: FLINK-36043 > URL: https://issues.apache.org/jira/browse/FLINK-36043 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.20.0 > Reporter: Yuan Kui > Priority: Major > Attachments: image-2024-08-13-16-40-27-929.png > > > As deigned, the SinkMaterializer operator should not be generated when the > upsert keys are the same as the primary keys. Example: > {code:java} > -- source table > create table source_table ( > col1 int, > col2 int, > col3 int, > col4 int > ) with ( > 'connector' = 'datagen', > 'rows-per-second'='10' > ); > -- sink table > create table sink_table( > col1 int, > col2 int, > col3_cnt bigint, > col4_max int, > primary key(col1, col2) not enforced > ) with ( > 'connector' = 'blackhole' > ); > -- sql > insert into sink_table > select > col1, > col2, > count(col3) like col3_cnt, > max(col4) like col4_max > from source_table > group by col1,col2;{code} > It works well, and the excution plan has no SinkMaterializer operator: > {code:java} > == Optimized Execution Plan == > Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2, > col3_cnt, col4_max]) > +- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS > col3_cnt, MAX(col4) AS col4_max]) > +- Exchange(distribution=[hash[col1, col2]]) > +- TableSourceScan(table=[[default_catalog, default_database, > source_table]], fields=[col1, col2, col3, col4]) {code} > however, if we use coalesce function on upsert keys, such as: > {code:java} > insert into sink_table > select > -- use coalesce > coalesce(col1, 0) as col1, > col2, > count(col3) like col3_cnt, > max(col4) like col4_max > from source_table > group by col1,col2; {code} > the SinkMaterializer operator will be generated: > {code:java} > == Optimized Execution Plan == > Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2, > col3_cnt, col4_max], upsertMaterialize=[true]) > +- Calc(select=[coalesce(col1, 0) AS col1, col2, col3_cnt, col4_max]) > +- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS > col3_cnt, MAX(col4) AS col4_max]) > +- Exchange(distribution=[hash[col1, col2]]) > +- TableSourceScan(table=[[default_catalog, default_database, > source_table]], fields=[col1, col2, col3, col4]) {code} > Changing `coalesce(col1, 0)` to `if(col1 is null, 0, col1)` will meet the > same problem. > > The code for determining whether a SinkMaterializer operator should be > generated is in > `org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.SatisfyUpdateKindTraitVisitor#analyzeUpsertMaterializeStrategy`. > If making a point in line 881 like this: > !image-2024-08-13-16-40-27-929.png! > I found the changeLogUpsertKeys are empty, which lead to 'whether to generate > SinkMaterializer' always true. > Is that by designed or a bug? > > -- This message was sent by Atlassian Jira (v8.20.10#820010)