Yuan Kui created FLINK-36043: -------------------------------- Summary: [SQL] Unexpected SinkMaterializer operator generated when use coalesce function on upsert key fields Key: FLINK-36043 URL: https://issues.apache.org/jira/browse/FLINK-36043 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.20.0 Reporter: Yuan Kui Attachments: image-2024-08-13-16-40-27-929.png
As deigned, the SinkMaterializer operator should not be generated when the upsert keys are the same as the primary keys. Example: {code:java} -- source table create table source_table ( col1 int, col2 int, col3 int, col4 int ) with ( 'connector' = 'datagen', 'rows-per-second'='10', 'fields.col1.kind'='random', 'fields.col2.kind'='random', 'fields.col3.kind'='random', 'fields.col4.kind'='random' ); -- sink table create table sink_table( col1 int, col2 int, col3_cnt bigint, col4_max int, primary key(col1, col2) not enforced ) with ( 'connector' = 'blackhole' ); -- sql insert into sink_table select col1, col2, count(col3) like col3_cnt, max(col4) like col4_max from source_table group by col1,col2;{code} It works well, and the excution plan has no SinkMaterializer operator: {code:java} == Optimized Execution Plan == Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2, col3_cnt, col4_max]) +- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS col3_cnt, MAX(col4) AS col4_max]) +- Exchange(distribution=[hash[col1, col2]]) +- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[col1, col2, col3, col4]) {code} however, if we use coalesce function on upsert keys, such as: {code:java} insert into sink_table select -- use coalesce coalesce(col1, 0) as col1, col2, count(col3) like col3_cnt, max(col4) like col4_max from source_table group by col1,col2; {code} the SinkMaterializer operator will be generated: {code:java} == Optimized Execution Plan == Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2, col3_cnt, col4_max], upsertMaterialize=[true]) +- Calc(select=[coalesce(col1, 0) AS col1, col2, col3_cnt, col4_max]) +- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS col3_cnt, MAX(col4) AS col4_max]) +- Exchange(distribution=[hash[col1, col2]]) +- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[col1, col2, col3, col4]) {code} If changing `coalesce(col1, 0)` to `if(col1 is null, 0, col1)`, meet the same problem. The code for determining whether a SinkMaterializer operator should be generated is in `org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.SatisfyUpdateKindTraitVisitor#analyzeUpsertMaterializeStrategy`. If making a point in line 881 like this: !image-2024-08-13-16-40-27-929.png! I found the changeLogUpsertKeys are empty, which lead to 'whether to generate SinkMaterializer' always true. Is that by designed or a bug? -- This message was sent by Atlassian Jira (v8.20.10#820010)